use iocp for net worker waits instead of WSAPoll + dummy sockets
This commit is contained in:
parent
34c37417a9
commit
cacdf10229
@ -3,6 +3,8 @@
|
||||
|
||||
#define NET_PacketSize 1024
|
||||
|
||||
#define NET_Ephemeral 0xFFFFFFFFFFFFFFFFull
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ Opaque types
|
||||
|
||||
|
||||
@ -15,9 +15,8 @@ void NET_Bootstrap(void)
|
||||
Panic(StringF(perm, "Failed to initialize Winsock (error code - %F)", FmtSint(err)));
|
||||
}
|
||||
|
||||
// Init worker wake sockets
|
||||
NET_W32.wake_send_sock = NET_W32_CreateDummySocket();
|
||||
NET_W32.wake_recv_sock = NET_W32_CreateDummySocket();
|
||||
// Init worker completion port
|
||||
NET_W32.iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
|
||||
|
||||
// Start worker
|
||||
DispatchWave(Lit("Net"), 1, NET_W32_TickForever, 0);
|
||||
@ -26,48 +25,6 @@ void NET_Bootstrap(void)
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ Helpers
|
||||
|
||||
NET_W32_DummySocket NET_W32_CreateDummySocket(void)
|
||||
{
|
||||
NET_W32_DummySocket result = Zi;
|
||||
b32 ok = 1;
|
||||
SOCKET sock = 0;
|
||||
if (ok)
|
||||
{
|
||||
sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||
ok = sock != INVALID_SOCKET;
|
||||
}
|
||||
if (ok)
|
||||
{
|
||||
struct sockaddr_in addr = Zi;
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
||||
ok = bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0;
|
||||
}
|
||||
struct sockaddr_storage ss = Zi;
|
||||
i32 ss_sizeof = sizeof(ss);
|
||||
if (ok)
|
||||
{
|
||||
ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR;
|
||||
}
|
||||
if (ok)
|
||||
{
|
||||
u_long nonblocking = 1;
|
||||
ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0;
|
||||
}
|
||||
if (!ok)
|
||||
{
|
||||
if (sock != INVALID_SOCKET)
|
||||
{
|
||||
closesocket(sock);
|
||||
sock = 0;
|
||||
}
|
||||
}
|
||||
result.sock = sock;
|
||||
result.addr_size = ss_sizeof;
|
||||
result.addr = ss;
|
||||
return result;
|
||||
}
|
||||
|
||||
NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle)
|
||||
{
|
||||
return (NET_W32_Pipe *)pipe_handle.v;
|
||||
@ -89,14 +46,8 @@ struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key)
|
||||
|
||||
void NET_W32_SignalWorker(void)
|
||||
{
|
||||
i32 err = sendto(
|
||||
NET_W32.wake_send_sock.sock,
|
||||
(char *)"1",
|
||||
1,
|
||||
0,
|
||||
(struct sockaddr *)&NET_W32.wake_recv_sock.addr,
|
||||
NET_W32.wake_send_sock.addr_size
|
||||
);
|
||||
// TODO: Only post if previous signal was seen
|
||||
PostQueuedCompletionStatus(NET_W32.iocp, 0, 0, 0);
|
||||
}
|
||||
|
||||
NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key)
|
||||
@ -162,6 +113,27 @@ u32 NET_W32_ChecksumFromPacketString(String str)
|
||||
return result ^ NET_W32_ProtocolMagic;
|
||||
}
|
||||
|
||||
void NET_W32_PostRecv(NET_W32_Pipe *pipe)
|
||||
{
|
||||
ZeroStruct(&pipe->ovl);
|
||||
ZeroStruct(&pipe->recv_addr);
|
||||
ZeroStruct(&pipe->recv_addr_sz);
|
||||
pipe->recv_addr_sz = sizeof(pipe->recv_addr);
|
||||
pipe->iocp_count += 1;
|
||||
|
||||
DWORD flags = 0;
|
||||
i32 ret = WSARecvFrom(
|
||||
pipe->udp,
|
||||
&pipe->recv_wsabuff, 1,
|
||||
0,
|
||||
&flags,
|
||||
(struct sockaddr *)&pipe->recv_addr,
|
||||
&pipe->recv_addr_sz,
|
||||
&pipe->ovl,
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ @hookimpl Net ops
|
||||
|
||||
@ -239,6 +211,7 @@ NET_PipeHandle NET_AcquirePipe(void)
|
||||
{
|
||||
Arena *perm = PermArena();
|
||||
NET_W32_Pipe *pipe = PushStruct(perm, NET_W32_Pipe);
|
||||
// Init pipe
|
||||
{
|
||||
for (i64 idx = 0; idx < countof(pipe->cmd_buffs); ++idx)
|
||||
{
|
||||
@ -252,7 +225,15 @@ NET_PipeHandle NET_AcquirePipe(void)
|
||||
}
|
||||
pipe->peer_bins_count = Kibi(1);
|
||||
pipe->peer_bins = PushStructs(perm, NET_W32_PeerBin, pipe->peer_bins_count);
|
||||
{
|
||||
PushAlign(perm, CachelineSize);
|
||||
i32 buff_len = Kibi(2);
|
||||
pipe->recv_wsabuff.buf = PushStructsNoZero(perm, char, buff_len);
|
||||
pipe->recv_wsabuff.len = buff_len;
|
||||
PushAlign(perm, CachelineSize);
|
||||
}
|
||||
}
|
||||
// Insert pipe
|
||||
{
|
||||
LockTicketMutex(&NET_W32.pipes_tm);
|
||||
{
|
||||
@ -268,15 +249,7 @@ NET_PipeHandle NET_AcquirePipe(void)
|
||||
void NET_Bind(NET_PipeHandle pipe_handle, u64 port)
|
||||
{
|
||||
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
|
||||
if (port == 0)
|
||||
{
|
||||
Atomic64Set(&pipe->desired_port, 0xFFFFFFFFFFFFFFFFull);
|
||||
}
|
||||
else
|
||||
{
|
||||
Atomic64Set(&pipe->desired_port, port);
|
||||
}
|
||||
// FIXME: Signal here if ports don't match
|
||||
}
|
||||
|
||||
NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle)
|
||||
@ -300,6 +273,10 @@ NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle)
|
||||
void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst)
|
||||
{
|
||||
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
|
||||
if (Atomic64Fetch(&pipe->desired_port) == 0)
|
||||
{
|
||||
Atomic64Set(&pipe->desired_port, NET_Ephemeral);
|
||||
}
|
||||
LockTicketMutex(&pipe->back_cmd_buff_seq_tm);
|
||||
{
|
||||
NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)];
|
||||
@ -330,77 +307,18 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
{
|
||||
Arena *perm = PermArena();
|
||||
|
||||
i64 seen_signal = 0;
|
||||
for (;;)
|
||||
{
|
||||
TempArena scratch = BeginScratchNoConflict();
|
||||
i64 heartbeat_threshold_ns = NsFromSeconds(0.250);
|
||||
i64 passive_run_threshold_ns = NsFromSeconds(0.250);
|
||||
i64 bind_retry_threshold_ns = NsFromSeconds(0.250);
|
||||
|
||||
// TODO: Base this on peer's latency w/ rolling backoff
|
||||
i64 msg_resend_threshold_ns = NsFromSeconds(0.250);
|
||||
|
||||
//////////////////////////////
|
||||
//- Wait
|
||||
|
||||
{
|
||||
// Build fd list containing every bound pipe's socket + the worker's signal socket
|
||||
i64 fds_count = 0;
|
||||
WSAPOLLFD *fds = 0;
|
||||
{
|
||||
LockTicketMutex(&NET_W32.pipes_tm);
|
||||
{
|
||||
for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next)
|
||||
{
|
||||
if (pipe->udp)
|
||||
{
|
||||
fds_count += 1;
|
||||
}
|
||||
}
|
||||
if (NET_W32.wake_recv_sock.sock != 0)
|
||||
{
|
||||
fds_count += 1;
|
||||
}
|
||||
fds = PushStructsNoZero(scratch.arena, WSAPOLLFD, fds_count);
|
||||
{
|
||||
i64 fd_idx = 0;
|
||||
for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next)
|
||||
{
|
||||
if (pipe->udp)
|
||||
{
|
||||
fds[fd_idx].fd = pipe->udp;
|
||||
fds[fd_idx].events = POLLRDNORM;
|
||||
fd_idx += 1;
|
||||
}
|
||||
}
|
||||
if (NET_W32.wake_recv_sock.sock != 0)
|
||||
{
|
||||
fds[fd_idx].fd = NET_W32.wake_recv_sock.sock;
|
||||
fds[fd_idx].events = POLLRDNORM;
|
||||
fd_idx += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
UnlockTicketMutex(&NET_W32.pipes_tm);
|
||||
}
|
||||
|
||||
// Wait
|
||||
i32 timeout_ms = MsFromNs(passive_run_threshold_ns);
|
||||
WSAPoll(fds, fds_count, timeout_ms);
|
||||
|
||||
// Drain wake recv sock
|
||||
{
|
||||
i32 len = 0;
|
||||
while (len >= 0)
|
||||
{
|
||||
u8 buff[Kibi(2)];
|
||||
len = recv(NET_W32.wake_recv_sock.sock, (char *)buff, countof(buff), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- Process pipes
|
||||
//- Pop pipes
|
||||
|
||||
i64 pipes_count = 0;
|
||||
NET_W32_Pipe **pipes = 0;
|
||||
@ -417,13 +335,12 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
UnlockTicketMutex(&NET_W32.pipes_tm);
|
||||
}
|
||||
|
||||
for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx)
|
||||
{
|
||||
NET_W32_Pipe *pipe = pipes[pipe_idx];
|
||||
|
||||
//////////////////////////////
|
||||
//- Reset frame data
|
||||
|
||||
for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx)
|
||||
{
|
||||
NET_W32_Pipe *pipe = pipes[pipe_idx];
|
||||
{
|
||||
pipe->num_msg_packets_received_this_frame = 0;
|
||||
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
|
||||
@ -431,186 +348,74 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
peer->num_msg_packets_received_this_frame = 0;
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- Bind
|
||||
|
||||
{
|
||||
u64 desired_port = Atomic64Fetch(&pipe->desired_port);
|
||||
if (desired_port != 0)
|
||||
{
|
||||
b32 is_ephemeral = desired_port >= Kibi(64);
|
||||
if (is_ephemeral)
|
||||
{
|
||||
desired_port = 0;
|
||||
}
|
||||
|
||||
// FIXME: Retry on timeout
|
||||
if (!pipe->udp || (!is_ephemeral && pipe->bound_port != desired_port))
|
||||
{
|
||||
b32 ok = 1;
|
||||
String port_str = StringF(scratch.arena, "%F", FmtUint(desired_port));
|
||||
char *port_cstr = CstrFromString(scratch.arena, port_str);
|
||||
|
||||
if (pipe->udp)
|
||||
{
|
||||
closesocket(pipe->udp);
|
||||
pipe->bound_port = 0;
|
||||
pipe->udp = 0;
|
||||
}
|
||||
|
||||
//- Init bind address
|
||||
struct addrinfo hints = Zi;
|
||||
hints.ai_family = AF_INET6;
|
||||
hints.ai_socktype = SOCK_DGRAM;
|
||||
hints.ai_protocol= IPPROTO_UDP;
|
||||
hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
|
||||
struct addrinfo *ai = 0;
|
||||
if (ok)
|
||||
{
|
||||
ok = getaddrinfo(0, port_cstr, &hints, &ai) == 0;
|
||||
}
|
||||
|
||||
//- Create udp socket
|
||||
SOCKET sock = 0;
|
||||
if (ok)
|
||||
{
|
||||
sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
|
||||
ok = sock != INVALID_SOCKET;
|
||||
}
|
||||
|
||||
//- Enable address reuse
|
||||
if (ok)
|
||||
{
|
||||
b32 reuse = 1;
|
||||
ok = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == 0;
|
||||
}
|
||||
|
||||
//- Enable dual stack
|
||||
if (ok)
|
||||
{
|
||||
DWORD v6_only = 0;
|
||||
ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0;
|
||||
}
|
||||
|
||||
//- Set buffer sizes
|
||||
if (ok)
|
||||
{
|
||||
i32 rcvbuf_min = Mebi(1);
|
||||
i32 sndbuf_min = Mebi(1);
|
||||
{
|
||||
i32 rcvbuf = 0;
|
||||
i32 rcvbuf_sz = sizeof(rcvbuf);
|
||||
if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0)
|
||||
{
|
||||
if (rcvbuf < rcvbuf_min)
|
||||
{
|
||||
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min));
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
i32 sndbuf = 0;
|
||||
i32 sndbuf_sz = sizeof(sndbuf);
|
||||
if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0)
|
||||
{
|
||||
if (sndbuf < sndbuf_min)
|
||||
{
|
||||
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//- Bind
|
||||
if (ok)
|
||||
{
|
||||
ok = bind(sock, ai->ai_addr, (i32)ai->ai_addrlen) == 0;
|
||||
}
|
||||
|
||||
//- Enable non-blocking
|
||||
if (ok)
|
||||
{
|
||||
u_long nonblocking = 1;
|
||||
ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0;
|
||||
}
|
||||
|
||||
//- Fetch bound port
|
||||
u64 bound_port = 0;
|
||||
{
|
||||
struct sockaddr_storage ss = Zi;
|
||||
if (ok)
|
||||
{
|
||||
i32 ss_sizeof = sizeof(ss);
|
||||
ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR;
|
||||
}
|
||||
if (ok)
|
||||
{
|
||||
if (ss.ss_family == AF_INET)
|
||||
{
|
||||
struct sockaddr_in *a = (struct sockaddr_in *)&ss;
|
||||
bound_port = ntohs(a->sin_port);
|
||||
}
|
||||
else if (ss.ss_family == AF_INET6)
|
||||
{
|
||||
struct sockaddr_in6 *a6 = (struct sockaddr_in6 *)&ss;
|
||||
bound_port = ntohs(a6->sin6_port);
|
||||
}
|
||||
else
|
||||
{
|
||||
ok = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//- Finalize
|
||||
if (ok)
|
||||
{
|
||||
pipe->bound_port = bound_port;
|
||||
pipe->udp = sock;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (sock != INVALID_SOCKET)
|
||||
{
|
||||
closesocket(sock);
|
||||
}
|
||||
}
|
||||
|
||||
if (ai)
|
||||
{
|
||||
freeaddrinfo(ai);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- Read socket
|
||||
//- Read incoming packets
|
||||
|
||||
// TODO: Rate limit
|
||||
// TODO: Per-frame packet limit to ensure other pipes are still serviced during load
|
||||
// TODO: Per-frame recv limit to ensure other pipes are still serviced during load
|
||||
|
||||
{
|
||||
i32 len = 0;
|
||||
while (len >= 0)
|
||||
i64 wake_count = 0;
|
||||
b32 done = 0;
|
||||
while (!done)
|
||||
{
|
||||
u8 buff[Kibi(2)];
|
||||
struct sockaddr_in6 addr = Zi;
|
||||
i32 addr_sz = sizeof(addr);
|
||||
len = recvfrom(pipe->udp, (char *)buff, countof(buff), 0, (struct sockaddr *)&addr, &addr_sz);
|
||||
Atomic64FetchAdd(&pipe->total_bytes_received, len);
|
||||
if (len >= sizeof(NET_W32_PacketHeader) && len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize)
|
||||
DWORD len = 0;
|
||||
ULONG_PTR iocp_key = 0;
|
||||
OVERLAPPED *ovl = 0;
|
||||
|
||||
// Wait for packet
|
||||
b32 ok = 0;
|
||||
{
|
||||
i32 timeout_ms = MsFromNs(passive_run_threshold_ns);
|
||||
if (wake_count > 0)
|
||||
{
|
||||
// Keep processing without sleeping if we've already received a completion this frame
|
||||
timeout_ms = 0;
|
||||
}
|
||||
//
|
||||
// NOTE: We're really only using IOCP so that we have a way to wake
|
||||
// the worker mid-wait without having to use WSAPoll + a dummy socket.
|
||||
// This is because any data sent to that dummy socket will itself be
|
||||
// affected by network testing tools such as Clumsy, adding additional
|
||||
// unpredictability. E.g. if Clumsy is set to add 50ms of artificial
|
||||
// and drop 10% of packets on loopback, attempts to wake the worker
|
||||
// thread so that it can process queued cmds will be affected as well.
|
||||
//
|
||||
ok = GetQueuedCompletionStatus(NET_W32.iocp, &len, &iocp_key, &ovl, timeout_ms);
|
||||
wake_count += 1;
|
||||
if (!ok)
|
||||
{
|
||||
i32 err = GetLastError();
|
||||
if (err == WAIT_TIMEOUT)
|
||||
{
|
||||
done = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ovl)
|
||||
{
|
||||
NET_W32_Pipe *pipe = (NET_W32_Pipe *)ovl;
|
||||
pipe->iocp_count -= 1;
|
||||
if (ok)
|
||||
{
|
||||
String recv = STRING(len, (u8 *)pipe->recv_wsabuff.buf);
|
||||
struct sockaddr_in6 addr = pipe->recv_addr;
|
||||
|
||||
Atomic64FetchAdd(&pipe->total_bytes_received, recv.len);
|
||||
if (recv.len >= sizeof(NET_W32_PacketHeader) && recv.len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize)
|
||||
{
|
||||
NET_W32_PacketHeader header = Zi;
|
||||
CopyBytes(&header, buff, sizeof(header));
|
||||
u32 checksum = NET_W32_ChecksumFromPacketString(STRING(len, buff));
|
||||
CopyBytes(&header, recv.text, sizeof(header));
|
||||
u32 checksum = NET_W32_ChecksumFromPacketString(recv);
|
||||
if (header.checksum == checksum)
|
||||
{
|
||||
NET_Key key = NET_W32_KeyFromAddress(addr);
|
||||
String src_data = Zi;
|
||||
src_data.text = buff + sizeof(header);
|
||||
src_data.len = len - sizeof(header);
|
||||
src_data.text = recv.text + sizeof(header);
|
||||
src_data.len = recv.len - sizeof(header);
|
||||
|
||||
//- Fetch peer
|
||||
NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key);
|
||||
@ -746,6 +551,186 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-post recv
|
||||
NET_W32_PostRecv(pipe);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- Process pipes
|
||||
|
||||
for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx)
|
||||
{
|
||||
NET_W32_Pipe *pipe = pipes[pipe_idx];
|
||||
|
||||
//////////////////////////////
|
||||
//- Bind
|
||||
|
||||
{
|
||||
i64 now_ns = TimeNs();
|
||||
|
||||
u64 desired_port = Atomic64Fetch(&pipe->desired_port);
|
||||
if (desired_port != 0)
|
||||
{
|
||||
b32 desired_is_ephemeral = desired_port >= Kibi(64);
|
||||
if (desired_is_ephemeral)
|
||||
{
|
||||
desired_port = 0;
|
||||
}
|
||||
|
||||
if (
|
||||
(!pipe->udp || (!desired_is_ephemeral && pipe->bound_port != desired_port)) &&
|
||||
(pipe->last_attempted_bind_ns == 0 || now_ns - pipe->last_attempted_bind_ns > bind_retry_threshold_ns)
|
||||
)
|
||||
{
|
||||
b32 ok = 1;
|
||||
String port_str = StringF(scratch.arena, "%F", FmtUint(desired_port));
|
||||
char *port_cstr = CstrFromString(scratch.arena, port_str);
|
||||
|
||||
//- Close old socket
|
||||
if (pipe->udp)
|
||||
{
|
||||
closesocket(pipe->udp);
|
||||
pipe->bound_port = 0;
|
||||
pipe->udp = 0;
|
||||
}
|
||||
|
||||
if (pipe->iocp_count == 0)
|
||||
{
|
||||
pipe->last_attempted_bind_ns = now_ns;
|
||||
|
||||
//- Init bind address
|
||||
struct addrinfo hints = Zi;
|
||||
hints.ai_family = AF_INET6;
|
||||
hints.ai_socktype = SOCK_DGRAM;
|
||||
hints.ai_protocol= IPPROTO_UDP;
|
||||
hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
|
||||
struct addrinfo *ai = 0;
|
||||
if (ok)
|
||||
{
|
||||
ok = getaddrinfo(0, port_cstr, &hints, &ai) == 0;
|
||||
}
|
||||
|
||||
//- Create udp socket
|
||||
SOCKET sock = 0;
|
||||
if (ok)
|
||||
{
|
||||
// sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
|
||||
sock = WSASocketW(ai->ai_family, ai->ai_socktype, ai->ai_protocol, 0, 0, WSA_FLAG_OVERLAPPED);
|
||||
ok = sock != INVALID_SOCKET;
|
||||
}
|
||||
|
||||
//- Enable address reuse
|
||||
if (ok)
|
||||
{
|
||||
b32 reuse = 1;
|
||||
ok = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == 0;
|
||||
}
|
||||
|
||||
//- Enable dual stack
|
||||
if (ok)
|
||||
{
|
||||
DWORD v6_only = 0;
|
||||
ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0;
|
||||
}
|
||||
|
||||
//- Set buffer sizes
|
||||
if (ok)
|
||||
{
|
||||
i32 rcvbuf_min = Mebi(1);
|
||||
i32 sndbuf_min = Mebi(1);
|
||||
{
|
||||
i32 rcvbuf = 0;
|
||||
i32 rcvbuf_sz = sizeof(rcvbuf);
|
||||
if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0)
|
||||
{
|
||||
if (rcvbuf < rcvbuf_min)
|
||||
{
|
||||
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min));
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
i32 sndbuf = 0;
|
||||
i32 sndbuf_sz = sizeof(sndbuf);
|
||||
if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0)
|
||||
{
|
||||
if (sndbuf < sndbuf_min)
|
||||
{
|
||||
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//- Bind
|
||||
if (ok)
|
||||
{
|
||||
ok = bind(sock, ai->ai_addr, (i32)ai->ai_addrlen) == 0;
|
||||
}
|
||||
|
||||
//- Enable non-blocking
|
||||
if (ok)
|
||||
{
|
||||
u_long nonblocking = 1;
|
||||
ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0;
|
||||
}
|
||||
|
||||
//- Fetch bound port
|
||||
u64 bound_port = 0;
|
||||
{
|
||||
struct sockaddr_storage ss = Zi;
|
||||
if (ok)
|
||||
{
|
||||
i32 ss_sizeof = sizeof(ss);
|
||||
ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR;
|
||||
}
|
||||
if (ok)
|
||||
{
|
||||
if (ss.ss_family == AF_INET)
|
||||
{
|
||||
struct sockaddr_in *a = (struct sockaddr_in *)&ss;
|
||||
bound_port = ntohs(a->sin_port);
|
||||
}
|
||||
else if (ss.ss_family == AF_INET6)
|
||||
{
|
||||
struct sockaddr_in6 *a6 = (struct sockaddr_in6 *)&ss;
|
||||
bound_port = ntohs(a6->sin6_port);
|
||||
}
|
||||
else
|
||||
{
|
||||
ok = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//- Finalize
|
||||
if (ok)
|
||||
{
|
||||
pipe->bound_port = bound_port;
|
||||
pipe->udp = sock;
|
||||
|
||||
// Post initial recv
|
||||
CreateIoCompletionPort((HANDLE)sock, NET_W32.iocp, 0, 0);
|
||||
NET_W32_PostRecv(pipe);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (sock != INVALID_SOCKET)
|
||||
{
|
||||
closesocket(sock);
|
||||
}
|
||||
}
|
||||
|
||||
if (ai)
|
||||
{
|
||||
freeaddrinfo(ai);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -897,6 +882,8 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
//////////////////////////////
|
||||
//- Send heartbeat packets
|
||||
|
||||
if (pipe->udp)
|
||||
{
|
||||
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
|
||||
{
|
||||
b32 should_send_heartbeat = 0;
|
||||
@ -943,12 +930,15 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
peer->last_packet_sent_ns = now_ns;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- Send message packets
|
||||
|
||||
// TODO: Rate limit
|
||||
|
||||
if (pipe->udp)
|
||||
{
|
||||
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
|
||||
{
|
||||
// bottom_ack represents the highest contiguous sequence acknowledgement, meaning sequences in range [0, bottom_ack] are always acked.
|
||||
@ -1035,6 +1025,7 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
EndScratch(scratch);
|
||||
}
|
||||
|
||||
@ -119,6 +119,18 @@ Struct(NET_W32_PeerBin)
|
||||
|
||||
Struct(NET_W32_Pipe)
|
||||
{
|
||||
//- IOCP data
|
||||
|
||||
OVERLAPPED ovl; // First field must be OVERLAPPED for cast from IOCP returns
|
||||
|
||||
i32 iocp_count; // How many overlapped operations are currently queued.
|
||||
|
||||
WSABUF recv_wsabuff;
|
||||
struct sockaddr_in6 recv_addr;
|
||||
i32 recv_addr_sz;
|
||||
|
||||
SOCKET udp;
|
||||
|
||||
//- Shared data
|
||||
|
||||
NET_W32_Pipe *next;
|
||||
@ -139,6 +151,8 @@ Struct(NET_W32_Pipe)
|
||||
|
||||
//- Worker data
|
||||
|
||||
i64 last_attempted_bind_ns;
|
||||
|
||||
NET_W32_Peer *first_peer;
|
||||
NET_W32_Peer *last_peer;
|
||||
i64 num_msg_packets_received_this_frame;
|
||||
@ -147,7 +161,6 @@ Struct(NET_W32_Pipe)
|
||||
NET_W32_PeerBin *peer_bins;
|
||||
|
||||
u64 bound_port;
|
||||
SOCKET udp;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
@ -170,8 +183,7 @@ Struct(NET_W32_Ctx)
|
||||
NET_W32_Peer *first_free_peer;
|
||||
NET_W32_Packet *first_free_packet;
|
||||
|
||||
NET_W32_DummySocket wake_send_sock;
|
||||
NET_W32_DummySocket wake_recv_sock;
|
||||
HANDLE iocp;
|
||||
};
|
||||
|
||||
extern NET_W32_Ctx NET_W32;
|
||||
@ -186,6 +198,7 @@ struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key);
|
||||
void NET_W32_SignalWorker(void);
|
||||
NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key);
|
||||
u32 NET_W32_ChecksumFromPacketString(String str);
|
||||
void NET_W32_PostRecv(NET_W32_Pipe *pipe);
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ Worker
|
||||
|
||||
@ -2651,6 +2651,8 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
{
|
||||
// V_PushNotif(Lit("Hello!!!"));
|
||||
P_Msg *chat_msg = P_PushMsg(P_MsgKind_Chat, Lit("Hello!!!"));
|
||||
|
||||
|
||||
} break;
|
||||
}
|
||||
}
|
||||
@ -2670,7 +2672,6 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
|
||||
P_MsgList in_msgs = Zi;
|
||||
{
|
||||
NET_Bind(net_pipe, 0);
|
||||
NET_MsgList net_msgs = NET_Swap(frame->arena, net_pipe);
|
||||
for (NET_Msg *net_msg = net_msgs.first; net_msg; net_msg = net_msg->next)
|
||||
{
|
||||
@ -3847,7 +3848,8 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
if (frame->held_buttons[Button_R] && !prev_frame->held_buttons[Button_R])
|
||||
{
|
||||
LogDebugF("Sending test payload");
|
||||
NET_Push(net_pipe, server_key, STRING(P_TilesCount, predict_world->tiles), 0);
|
||||
// NET_Push(net_pipe, server_key, STRING(P_TilesCount, predict_world->tiles), 0);
|
||||
NET_Push(net_pipe, server_key, Lit("Hello there!"), 0);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user