From cacdf10229e86f22f9ef4f9a402e0e52b738f0f8 Mon Sep 17 00:00:00 2001 From: jacob Date: Sat, 17 Jan 2026 18:45:25 -0600 Subject: [PATCH] use iocp for net worker waits instead of WSAPoll + dummy sockets --- src/net/net.h | 2 + src/net/net_win32/net_win32.c | 999 +++++++++++++++++----------------- src/net/net_win32/net_win32.h | 19 +- src/pp/pp_vis/pp_vis_core.c | 6 +- 4 files changed, 517 insertions(+), 509 deletions(-) diff --git a/src/net/net.h b/src/net/net.h index ffdcaf3f..aece5091 100644 --- a/src/net/net.h +++ b/src/net/net.h @@ -3,6 +3,8 @@ #define NET_PacketSize 1024 +#define NET_Ephemeral 0xFFFFFFFFFFFFFFFFull + //////////////////////////////////////////////////////////// //~ Opaque types diff --git a/src/net/net_win32/net_win32.c b/src/net/net_win32/net_win32.c index 077a9a25..3edef48c 100644 --- a/src/net/net_win32/net_win32.c +++ b/src/net/net_win32/net_win32.c @@ -15,9 +15,8 @@ void NET_Bootstrap(void) Panic(StringF(perm, "Failed to initialize Winsock (error code - %F)", FmtSint(err))); } - // Init worker wake sockets - NET_W32.wake_send_sock = NET_W32_CreateDummySocket(); - NET_W32.wake_recv_sock = NET_W32_CreateDummySocket(); + // Init worker completion port + NET_W32.iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); // Start worker DispatchWave(Lit("Net"), 1, NET_W32_TickForever, 0); @@ -26,48 +25,6 @@ void NET_Bootstrap(void) //////////////////////////////////////////////////////////// //~ Helpers -NET_W32_DummySocket NET_W32_CreateDummySocket(void) -{ - NET_W32_DummySocket result = Zi; - b32 ok = 1; - SOCKET sock = 0; - if (ok) - { - sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - ok = sock != INVALID_SOCKET; - } - if (ok) - { - struct sockaddr_in addr = Zi; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - ok = bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0; - } - struct sockaddr_storage ss = Zi; - i32 ss_sizeof = sizeof(ss); - if (ok) - { - ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR; - } - if (ok) - { - u_long nonblocking = 1; - ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0; - } - if (!ok) - { - if (sock != INVALID_SOCKET) - { - closesocket(sock); - sock = 0; - } - } - result.sock = sock; - result.addr_size = ss_sizeof; - result.addr = ss; - return result; -} - NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle) { return (NET_W32_Pipe *)pipe_handle.v; @@ -89,14 +46,8 @@ struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key) void NET_W32_SignalWorker(void) { - i32 err = sendto( - NET_W32.wake_send_sock.sock, - (char *)"1", - 1, - 0, - (struct sockaddr *)&NET_W32.wake_recv_sock.addr, - NET_W32.wake_send_sock.addr_size - ); + // TODO: Only post if previous signal was seen + PostQueuedCompletionStatus(NET_W32.iocp, 0, 0, 0); } NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key) @@ -162,6 +113,27 @@ u32 NET_W32_ChecksumFromPacketString(String str) return result ^ NET_W32_ProtocolMagic; } +void NET_W32_PostRecv(NET_W32_Pipe *pipe) +{ + ZeroStruct(&pipe->ovl); + ZeroStruct(&pipe->recv_addr); + ZeroStruct(&pipe->recv_addr_sz); + pipe->recv_addr_sz = sizeof(pipe->recv_addr); + pipe->iocp_count += 1; + + DWORD flags = 0; + i32 ret = WSARecvFrom( + pipe->udp, + &pipe->recv_wsabuff, 1, + 0, + &flags, + (struct sockaddr *)&pipe->recv_addr, + &pipe->recv_addr_sz, + &pipe->ovl, + 0 + ); +} + //////////////////////////////////////////////////////////// //~ @hookimpl Net ops @@ -239,6 +211,7 @@ NET_PipeHandle NET_AcquirePipe(void) { Arena *perm = PermArena(); NET_W32_Pipe *pipe = PushStruct(perm, NET_W32_Pipe); + // Init pipe { for (i64 idx = 0; idx < countof(pipe->cmd_buffs); ++idx) { @@ -252,7 +225,15 @@ NET_PipeHandle NET_AcquirePipe(void) } pipe->peer_bins_count = Kibi(1); pipe->peer_bins = PushStructs(perm, NET_W32_PeerBin, pipe->peer_bins_count); + { + PushAlign(perm, CachelineSize); + i32 buff_len = Kibi(2); + pipe->recv_wsabuff.buf = PushStructsNoZero(perm, char, buff_len); + pipe->recv_wsabuff.len = buff_len; + PushAlign(perm, CachelineSize); + } } + // Insert pipe { LockTicketMutex(&NET_W32.pipes_tm); { @@ -268,15 +249,7 @@ NET_PipeHandle NET_AcquirePipe(void) void NET_Bind(NET_PipeHandle pipe_handle, u64 port) { NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); - if (port == 0) - { - Atomic64Set(&pipe->desired_port, 0xFFFFFFFFFFFFFFFFull); - } - else - { - Atomic64Set(&pipe->desired_port, port); - } - // FIXME: Signal here if ports don't match + Atomic64Set(&pipe->desired_port, port); } NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle) @@ -300,6 +273,10 @@ NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle) void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst) { NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); + if (Atomic64Fetch(&pipe->desired_port) == 0) + { + Atomic64Set(&pipe->desired_port, NET_Ephemeral); + } LockTicketMutex(&pipe->back_cmd_buff_seq_tm); { NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)]; @@ -330,77 +307,18 @@ void NET_W32_TickForever(WaveLaneCtx *lane) { Arena *perm = PermArena(); - i64 seen_signal = 0; for (;;) { TempArena scratch = BeginScratchNoConflict(); i64 heartbeat_threshold_ns = NsFromSeconds(0.250); i64 passive_run_threshold_ns = NsFromSeconds(0.250); + i64 bind_retry_threshold_ns = NsFromSeconds(0.250); // TODO: Base this on peer's latency w/ rolling backoff i64 msg_resend_threshold_ns = NsFromSeconds(0.250); ////////////////////////////// - //- Wait - - { - // Build fd list containing every bound pipe's socket + the worker's signal socket - i64 fds_count = 0; - WSAPOLLFD *fds = 0; - { - LockTicketMutex(&NET_W32.pipes_tm); - { - for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next) - { - if (pipe->udp) - { - fds_count += 1; - } - } - if (NET_W32.wake_recv_sock.sock != 0) - { - fds_count += 1; - } - fds = PushStructsNoZero(scratch.arena, WSAPOLLFD, fds_count); - { - i64 fd_idx = 0; - for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next) - { - if (pipe->udp) - { - fds[fd_idx].fd = pipe->udp; - fds[fd_idx].events = POLLRDNORM; - fd_idx += 1; - } - } - if (NET_W32.wake_recv_sock.sock != 0) - { - fds[fd_idx].fd = NET_W32.wake_recv_sock.sock; - fds[fd_idx].events = POLLRDNORM; - fd_idx += 1; - } - } - } - UnlockTicketMutex(&NET_W32.pipes_tm); - } - - // Wait - i32 timeout_ms = MsFromNs(passive_run_threshold_ns); - WSAPoll(fds, fds_count, timeout_ms); - - // Drain wake recv sock - { - i32 len = 0; - while (len >= 0) - { - u8 buff[Kibi(2)]; - len = recv(NET_W32.wake_recv_sock.sock, (char *)buff, countof(buff), 0); - } - } - } - - ////////////////////////////// - //- Process pipes + //- Pop pipes i64 pipes_count = 0; NET_W32_Pipe **pipes = 0; @@ -417,13 +335,12 @@ void NET_W32_TickForever(WaveLaneCtx *lane) UnlockTicketMutex(&NET_W32.pipes_tm); } + ////////////////////////////// + //- Reset frame data + for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx) { NET_W32_Pipe *pipe = pipes[pipe_idx]; - - ////////////////////////////// - //- Reset frame data - { pipe->num_msg_packets_received_this_frame = 0; for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) @@ -431,27 +348,249 @@ void NET_W32_TickForever(WaveLaneCtx *lane) peer->num_msg_packets_received_this_frame = 0; } } + } + + ////////////////////////////// + //- Read incoming packets + + // TODO: Rate limit + // TODO: Per-frame recv limit to ensure other pipes are still serviced during load + + { + i64 wake_count = 0; + b32 done = 0; + while (!done) + { + DWORD len = 0; + ULONG_PTR iocp_key = 0; + OVERLAPPED *ovl = 0; + + // Wait for packet + b32 ok = 0; + { + i32 timeout_ms = MsFromNs(passive_run_threshold_ns); + if (wake_count > 0) + { + // Keep processing without sleeping if we've already received a completion this frame + timeout_ms = 0; + } + // + // NOTE: We're really only using IOCP so that we have a way to wake + // the worker mid-wait without having to use WSAPoll + a dummy socket. + // This is because any data sent to that dummy socket will itself be + // affected by network testing tools such as Clumsy, adding additional + // unpredictability. E.g. if Clumsy is set to add 50ms of artificial + // and drop 10% of packets on loopback, attempts to wake the worker + // thread so that it can process queued cmds will be affected as well. + // + ok = GetQueuedCompletionStatus(NET_W32.iocp, &len, &iocp_key, &ovl, timeout_ms); + wake_count += 1; + if (!ok) + { + i32 err = GetLastError(); + if (err == WAIT_TIMEOUT) + { + done = 1; + } + } + } + + if (ovl) + { + NET_W32_Pipe *pipe = (NET_W32_Pipe *)ovl; + pipe->iocp_count -= 1; + if (ok) + { + String recv = STRING(len, (u8 *)pipe->recv_wsabuff.buf); + struct sockaddr_in6 addr = pipe->recv_addr; + + Atomic64FetchAdd(&pipe->total_bytes_received, recv.len); + if (recv.len >= sizeof(NET_W32_PacketHeader) && recv.len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize) + { + NET_W32_PacketHeader header = Zi; + CopyBytes(&header, recv.text, sizeof(header)); + u32 checksum = NET_W32_ChecksumFromPacketString(recv); + if (header.checksum == checksum) + { + NET_Key key = NET_W32_KeyFromAddress(addr); + String src_data = Zi; + src_data.text = recv.text + sizeof(header); + src_data.len = recv.len - sizeof(header); + + //- Fetch peer + NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key); + peer->key = key; + peer->last_packet_received_ns = TimeNs(); + if (header.bottom_ack == peer->remote_bottom_ack) + { + peer->remote_ack_bits |= header.ack_bits; + } + else if (header.bottom_ack > peer->remote_bottom_ack) + { + peer->remote_bottom_ack = header.bottom_ack; + peer->remote_ack_bits = header.ack_bits; + } + + //- Read msg packet + if (!(header.flags & NET_W32_PacketFlag_Heartbeat)) + { + // Update our stats + b32 is_sequential = 0; + b32 should_process = 0; + if (header.seq == peer->bottom_ack + 1) + { + is_sequential = 1; + should_process = 1; + } + else if (header.seq > peer->bottom_ack + 1 && header.seq <= peer->bottom_ack + 65) + { + u64 ack_bit = (u64)1 << (header.seq - peer->bottom_ack - 2); + should_process = !(peer->ack_bits & ack_bit); + peer->ack_bits |= ack_bit; + } + peer->num_msg_packets_received_this_frame += 1; + pipe->num_msg_packets_received_this_frame += 1; + + // Process packet + if (should_process) + { + NET_W32_Packet *packet = NET_W32.first_free_packet; + if (packet) + { + SllStackPop(NET_W32.first_free_packet); + String old_packet_data = packet->data; + ZeroStruct(packet); + packet->data = old_packet_data; + packet->data.len = 0; + } + else + { + packet = PushStruct(perm, NET_W32_Packet); + packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize); + } + packet->flags = header.flags; + packet->seq = header.seq; + packet->msg_seq = header.msg_seq; + CopyBytes(packet->data.text, src_data.text, src_data.len); + packet->data.len = src_data.len; + + // Insert packet + { + NET_W32_Packet *left = peer->last_frag_packet; + for (; left; left = left->prev) + { + if (left->seq < packet->seq) + { + break; + } + } + DllQueueInsert(peer->first_frag_packet, peer->last_frag_packet, left, packet); + } + + // Transfer fragmented -> contiguous packets + if (is_sequential) + { + NET_W32_Packet *contig_start = peer->first_frag_packet; + NET_W32_Packet *contig_end = packet; + for (NET_W32_Packet *tmp = packet->next; tmp; tmp = tmp->next) + { + if (tmp->seq == tmp->prev->seq + 1) + { + contig_end = tmp; + } + else + { + break; + } + } + + if (peer->last_cont_packet) + { + peer->last_cont_packet->next = contig_start; + } + else + { + peer->first_cont_packet = contig_start; + } + contig_start->prev = peer->last_cont_packet; + peer->last_cont_packet = contig_end; + + if (contig_end->next) + { + contig_end->next->prev = 0; + } + else + { + peer->last_frag_packet = 0; + } + peer->first_frag_packet = contig_end->next; + contig_end->next = 0; + + i64 diff = contig_end->seq - peer->bottom_ack; + if (diff <= 63) + { + peer->ack_bits >>= diff; + } + else + { + peer->ack_bits = 0; + } + peer->bottom_ack = contig_end->seq; + } + } + + // LogDebugF( + // "Received msg packet. seq: %F, msg seq: %F, new bottom ack: %F, new ack bits: %F, should_process: %F, data: \"%F\"", + // FmtSint(header.seq), + // FmtSint(header.msg_seq), + // FmtSint(peer->bottom_ack), + // FmtUint(peer->ack_bits), + // FmtUint(should_process), + // FmtString(src_data) + // ); + } + } + } + + // Re-post recv + NET_W32_PostRecv(pipe); + } + } + } + } + + ////////////////////////////// + //- Process pipes + + for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx) + { + NET_W32_Pipe *pipe = pipes[pipe_idx]; ////////////////////////////// //- Bind { + i64 now_ns = TimeNs(); + u64 desired_port = Atomic64Fetch(&pipe->desired_port); if (desired_port != 0) { - b32 is_ephemeral = desired_port >= Kibi(64); - if (is_ephemeral) + b32 desired_is_ephemeral = desired_port >= Kibi(64); + if (desired_is_ephemeral) { desired_port = 0; } - // FIXME: Retry on timeout - if (!pipe->udp || (!is_ephemeral && pipe->bound_port != desired_port)) + if ( + (!pipe->udp || (!desired_is_ephemeral && pipe->bound_port != desired_port)) && + (pipe->last_attempted_bind_ns == 0 || now_ns - pipe->last_attempted_bind_ns > bind_retry_threshold_ns) + ) { b32 ok = 1; String port_str = StringF(scratch.arena, "%F", FmtUint(desired_port)); char *port_cstr = CstrFromString(scratch.arena, port_str); + //- Close old socket if (pipe->udp) { closesocket(pipe->udp); @@ -459,290 +598,136 @@ void NET_W32_TickForever(WaveLaneCtx *lane) pipe->udp = 0; } - //- Init bind address - struct addrinfo hints = Zi; - hints.ai_family = AF_INET6; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol= IPPROTO_UDP; - hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV; - struct addrinfo *ai = 0; - if (ok) + if (pipe->iocp_count == 0) { - ok = getaddrinfo(0, port_cstr, &hints, &ai) == 0; - } + pipe->last_attempted_bind_ns = now_ns; - //- Create udp socket - SOCKET sock = 0; - if (ok) - { - sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); - ok = sock != INVALID_SOCKET; - } - - //- Enable address reuse - if (ok) - { - b32 reuse = 1; - ok = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == 0; - } - - //- Enable dual stack - if (ok) - { - DWORD v6_only = 0; - ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0; - } - - //- Set buffer sizes - if (ok) - { - i32 rcvbuf_min = Mebi(1); - i32 sndbuf_min = Mebi(1); + //- Init bind address + struct addrinfo hints = Zi; + hints.ai_family = AF_INET6; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol= IPPROTO_UDP; + hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV; + struct addrinfo *ai = 0; + if (ok) { - i32 rcvbuf = 0; - i32 rcvbuf_sz = sizeof(rcvbuf); - if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0) + ok = getaddrinfo(0, port_cstr, &hints, &ai) == 0; + } + + //- Create udp socket + SOCKET sock = 0; + if (ok) + { + // sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + sock = WSASocketW(ai->ai_family, ai->ai_socktype, ai->ai_protocol, 0, 0, WSA_FLAG_OVERLAPPED); + ok = sock != INVALID_SOCKET; + } + + //- Enable address reuse + if (ok) + { + b32 reuse = 1; + ok = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == 0; + } + + //- Enable dual stack + if (ok) + { + DWORD v6_only = 0; + ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0; + } + + //- Set buffer sizes + if (ok) + { + i32 rcvbuf_min = Mebi(1); + i32 sndbuf_min = Mebi(1); { - if (rcvbuf < rcvbuf_min) + i32 rcvbuf = 0; + i32 rcvbuf_sz = sizeof(rcvbuf); + if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0) { - setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min)); + if (rcvbuf < rcvbuf_min) + { + setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min)); + } + } + } + { + i32 sndbuf = 0; + i32 sndbuf_sz = sizeof(sndbuf); + if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0) + { + if (sndbuf < sndbuf_min) + { + setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min)); + } } } } + + //- Bind + if (ok) { - i32 sndbuf = 0; - i32 sndbuf_sz = sizeof(sndbuf); - if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0) + ok = bind(sock, ai->ai_addr, (i32)ai->ai_addrlen) == 0; + } + + //- Enable non-blocking + if (ok) + { + u_long nonblocking = 1; + ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0; + } + + //- Fetch bound port + u64 bound_port = 0; + { + struct sockaddr_storage ss = Zi; + if (ok) { - if (sndbuf < sndbuf_min) + i32 ss_sizeof = sizeof(ss); + ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR; + } + if (ok) + { + if (ss.ss_family == AF_INET) { - setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min)); + struct sockaddr_in *a = (struct sockaddr_in *)&ss; + bound_port = ntohs(a->sin_port); } - } - } - } - - //- Bind - if (ok) - { - ok = bind(sock, ai->ai_addr, (i32)ai->ai_addrlen) == 0; - } - - //- Enable non-blocking - if (ok) - { - u_long nonblocking = 1; - ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0; - } - - //- Fetch bound port - u64 bound_port = 0; - { - struct sockaddr_storage ss = Zi; - if (ok) - { - i32 ss_sizeof = sizeof(ss); - ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR; - } - if (ok) - { - if (ss.ss_family == AF_INET) - { - struct sockaddr_in *a = (struct sockaddr_in *)&ss; - bound_port = ntohs(a->sin_port); - } - else if (ss.ss_family == AF_INET6) - { - struct sockaddr_in6 *a6 = (struct sockaddr_in6 *)&ss; - bound_port = ntohs(a6->sin6_port); - } - else - { - ok = 0; - } - } - } - - //- Finalize - if (ok) - { - pipe->bound_port = bound_port; - pipe->udp = sock; - } - else - { - if (sock != INVALID_SOCKET) - { - closesocket(sock); - } - } - - if (ai) - { - freeaddrinfo(ai); - } - } - } - } - - ////////////////////////////// - //- Read socket - - // TODO: Rate limit - // TODO: Per-frame packet limit to ensure other pipes are still serviced during load - - { - i32 len = 0; - while (len >= 0) - { - u8 buff[Kibi(2)]; - struct sockaddr_in6 addr = Zi; - i32 addr_sz = sizeof(addr); - len = recvfrom(pipe->udp, (char *)buff, countof(buff), 0, (struct sockaddr *)&addr, &addr_sz); - Atomic64FetchAdd(&pipe->total_bytes_received, len); - if (len >= sizeof(NET_W32_PacketHeader) && len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize) - { - NET_W32_PacketHeader header = Zi; - CopyBytes(&header, buff, sizeof(header)); - u32 checksum = NET_W32_ChecksumFromPacketString(STRING(len, buff)); - if (header.checksum == checksum) - { - NET_Key key = NET_W32_KeyFromAddress(addr); - String src_data = Zi; - src_data.text = buff + sizeof(header); - src_data.len = len - sizeof(header); - - //- Fetch peer - NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key); - peer->key = key; - peer->last_packet_received_ns = TimeNs(); - if (header.bottom_ack == peer->remote_bottom_ack) - { - peer->remote_ack_bits |= header.ack_bits; - } - else if (header.bottom_ack > peer->remote_bottom_ack) - { - peer->remote_bottom_ack = header.bottom_ack; - peer->remote_ack_bits = header.ack_bits; - } - - //- Read msg packet - if (!(header.flags & NET_W32_PacketFlag_Heartbeat)) - { - // Update our stats - b32 is_sequential = 0; - b32 should_process = 0; - if (header.seq == peer->bottom_ack + 1) - { - is_sequential = 1; - should_process = 1; - } - else if (header.seq > peer->bottom_ack + 1 && header.seq <= peer->bottom_ack + 65) - { - u64 ack_bit = (u64)1 << (header.seq - peer->bottom_ack - 2); - should_process = !(peer->ack_bits & ack_bit); - peer->ack_bits |= ack_bit; - } - peer->num_msg_packets_received_this_frame += 1; - pipe->num_msg_packets_received_this_frame += 1; - - // Process packet - if (should_process) - { - NET_W32_Packet *packet = NET_W32.first_free_packet; - if (packet) + else if (ss.ss_family == AF_INET6) { - SllStackPop(NET_W32.first_free_packet); - String old_packet_data = packet->data; - ZeroStruct(packet); - packet->data = old_packet_data; - packet->data.len = 0; + struct sockaddr_in6 *a6 = (struct sockaddr_in6 *)&ss; + bound_port = ntohs(a6->sin6_port); } else { - packet = PushStruct(perm, NET_W32_Packet); - packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize); - } - packet->flags = header.flags; - packet->seq = header.seq; - packet->msg_seq = header.msg_seq; - CopyBytes(packet->data.text, src_data.text, src_data.len); - packet->data.len = src_data.len; - - // Insert packet - { - NET_W32_Packet *left = peer->last_frag_packet; - for (; left; left = left->prev) - { - if (left->seq < packet->seq) - { - break; - } - } - DllQueueInsert(peer->first_frag_packet, peer->last_frag_packet, left, packet); - } - - // Transfer fragmented -> contiguous packets - if (is_sequential) - { - NET_W32_Packet *contig_start = peer->first_frag_packet; - NET_W32_Packet *contig_end = packet; - for (NET_W32_Packet *tmp = packet->next; tmp; tmp = tmp->next) - { - if (tmp->seq == tmp->prev->seq + 1) - { - contig_end = tmp; - } - else - { - break; - } - } - - if (peer->last_cont_packet) - { - peer->last_cont_packet->next = contig_start; - } - else - { - peer->first_cont_packet = contig_start; - } - contig_start->prev = peer->last_cont_packet; - peer->last_cont_packet = contig_end; - - if (contig_end->next) - { - contig_end->next->prev = 0; - } - else - { - peer->last_frag_packet = 0; - } - peer->first_frag_packet = contig_end->next; - contig_end->next = 0; - - i64 diff = contig_end->seq - peer->bottom_ack; - if (diff <= 63) - { - peer->ack_bits >>= diff; - } - else - { - peer->ack_bits = 0; - } - peer->bottom_ack = contig_end->seq; + ok = 0; } } + } - // LogDebugF( - // "Received msg packet. seq: %F, msg seq: %F, new bottom ack: %F, new ack bits: %F, should_process: %F, data: \"%F\"", - // FmtSint(header.seq), - // FmtSint(header.msg_seq), - // FmtSint(peer->bottom_ack), - // FmtUint(peer->ack_bits), - // FmtUint(should_process), - // FmtString(src_data) - // ); + //- Finalize + if (ok) + { + pipe->bound_port = bound_port; + pipe->udp = sock; + + // Post initial recv + CreateIoCompletionPort((HANDLE)sock, NET_W32.iocp, 0, 0); + NET_W32_PostRecv(pipe); + } + else + { + if (sock != INVALID_SOCKET) + { + closesocket(sock); + } + } + + if (ai) + { + freeaddrinfo(ai); } } } @@ -897,50 +882,53 @@ void NET_W32_TickForever(WaveLaneCtx *lane) ////////////////////////////// //- Send heartbeat packets - for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) + if (pipe->udp) { - b32 should_send_heartbeat = 0; - - // Send heartbeat packet if we received any packets this frame - if (peer->num_msg_packets_received_this_frame > 0) + for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) { - should_send_heartbeat = 1; - } + b32 should_send_heartbeat = 0; - // Send heartbeat if we haven't sent any packets in a while - i64 now_ns = TimeNs(); - if (now_ns - peer->last_packet_sent_ns > heartbeat_threshold_ns) - { - should_send_heartbeat = 1; - } - - if (should_send_heartbeat) - { - struct sockaddr_in6 addr = NET_W32_AddressFromKey(peer->key); - i64 buff_len = 0; - u8 buff[Kibi(2)]; - NET_W32_PacketHeader header = Zi; + // Send heartbeat packet if we received any packets this frame + if (peer->num_msg_packets_received_this_frame > 0) { - header.checksum = 0; - header.flags = NET_W32_PacketFlag_Heartbeat; - header.bottom_ack = peer->bottom_ack; - header.ack_bits = peer->ack_bits; + should_send_heartbeat = 1; } - CopyBytes(buff, &header, sizeof(header)); - buff_len += sizeof(header); - u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff)); - CopyBytes(buff, &checksum, sizeof(checksum)); - sendto( - pipe->udp, - (char *)buff, - buff_len, - 0, - (struct sockaddr *)&addr, - sizeof(addr) - ); - Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len); - peer->last_packet_sent_ns = now_ns; + // Send heartbeat if we haven't sent any packets in a while + i64 now_ns = TimeNs(); + if (now_ns - peer->last_packet_sent_ns > heartbeat_threshold_ns) + { + should_send_heartbeat = 1; + } + + if (should_send_heartbeat) + { + struct sockaddr_in6 addr = NET_W32_AddressFromKey(peer->key); + i64 buff_len = 0; + u8 buff[Kibi(2)]; + NET_W32_PacketHeader header = Zi; + { + header.checksum = 0; + header.flags = NET_W32_PacketFlag_Heartbeat; + header.bottom_ack = peer->bottom_ack; + header.ack_bits = peer->ack_bits; + } + CopyBytes(buff, &header, sizeof(header)); + buff_len += sizeof(header); + + u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff)); + CopyBytes(buff, &checksum, sizeof(checksum)); + sendto( + pipe->udp, + (char *)buff, + buff_len, + 0, + (struct sockaddr *)&addr, + sizeof(addr) + ); + Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len); + peer->last_packet_sent_ns = now_ns; + } } } @@ -949,89 +937,92 @@ void NET_W32_TickForever(WaveLaneCtx *lane) // TODO: Rate limit - for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) + if (pipe->udp) { - // bottom_ack represents the highest contiguous sequence acknowledgement, meaning sequences in range [0, bottom_ack] are always acked. - // This means bottom_ack + 1 is never acked. - // Ack bits represent acks for sequences in range [bottom_ack + 2, bottom_ack + 65] - i64 bottom_ack = peer->remote_bottom_ack; - u64 ack_bits = peer->remote_ack_bits; - - NET_Key key = peer->key; - struct sockaddr_in6 addr = NET_W32_AddressFromKey(key); - - // TODO: Maybe send more than just bottom_ack + 65 packets at a time. - for (NET_W32_Packet *packet = peer->first_remote_packet; packet && packet->seq <= bottom_ack + 65;) + for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) { - NET_W32_Packet *next = packet->next; + // bottom_ack represents the highest contiguous sequence acknowledgement, meaning sequences in range [0, bottom_ack] are always acked. + // This means bottom_ack + 1 is never acked. + // Ack bits represent acks for sequences in range [bottom_ack + 2, bottom_ack + 65] + i64 bottom_ack = peer->remote_bottom_ack; + u64 ack_bits = peer->remote_ack_bits; + + NET_Key key = peer->key; + struct sockaddr_in6 addr = NET_W32_AddressFromKey(key); + + // TODO: Maybe send more than just bottom_ack + 65 packets at a time. + for (NET_W32_Packet *packet = peer->first_remote_packet; packet && packet->seq <= bottom_ack + 65;) { - i64 seq = packet->seq; - b32 is_acked = 0; + NET_W32_Packet *next = packet->next; { - if (seq <= bottom_ack) + i64 seq = packet->seq; + b32 is_acked = 0; { - is_acked = 1; - } - else if (seq > bottom_ack + 1 && packet->seq <= bottom_ack + 65) - { - u64 ack_bit = (u64)1 << (seq - 2 - bottom_ack); - is_acked = !!(ack_bits & ack_bit); - } - } - if (is_acked) - { - // Prune acked packet - DllQueueRemove(peer->first_remote_packet, peer->last_remote_packet, packet); - SllStackPush(NET_W32.first_free_packet, packet); - } - else - { - // Transmit unacked packet - i64 now_ns = TimeNs(); - if (packet->last_sent_ns == 0 || (now_ns - packet->last_sent_ns > msg_resend_threshold_ns)) - { - peer->last_packet_sent_ns = now_ns; - packet->last_sent_ns = now_ns; - i64 buff_len = 0; - u8 buff[Kibi(2)]; - - NET_W32_PacketHeader header = Zi; + if (seq <= bottom_ack) { - header.checksum = 0; - header.flags = packet->flags; - header.seq = packet->seq; - header.msg_seq = packet->msg_seq; - header.bottom_ack = peer->bottom_ack; - header.ack_bits = peer->ack_bits; + is_acked = 1; } - CopyBytes(buff, &header, sizeof(header)); - buff_len += sizeof(header); + else if (seq > bottom_ack + 1 && packet->seq <= bottom_ack + 65) + { + u64 ack_bit = (u64)1 << (seq - 2 - bottom_ack); + is_acked = !!(ack_bits & ack_bit); + } + } + if (is_acked) + { + // Prune acked packet + DllQueueRemove(peer->first_remote_packet, peer->last_remote_packet, packet); + SllStackPush(NET_W32.first_free_packet, packet); + } + else + { + // Transmit unacked packet + i64 now_ns = TimeNs(); + if (packet->last_sent_ns == 0 || (now_ns - packet->last_sent_ns > msg_resend_threshold_ns)) + { + peer->last_packet_sent_ns = now_ns; + packet->last_sent_ns = now_ns; + i64 buff_len = 0; + u8 buff[Kibi(2)]; - CopyBytes(buff + buff_len, packet->data.text, packet->data.len); - buff_len += packet->data.len; + NET_W32_PacketHeader header = Zi; + { + header.checksum = 0; + header.flags = packet->flags; + header.seq = packet->seq; + header.msg_seq = packet->msg_seq; + header.bottom_ack = peer->bottom_ack; + header.ack_bits = peer->ack_bits; + } + CopyBytes(buff, &header, sizeof(header)); + buff_len += sizeof(header); - u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff)); - CopyBytes(buff, &checksum, sizeof(checksum)); - sendto( - pipe->udp, - (char *)buff, - buff_len, - 0, - (struct sockaddr *)&addr, - sizeof(addr) - ); - Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len); + CopyBytes(buff + buff_len, packet->data.text, packet->data.len); + buff_len += packet->data.len; - // LogDebugF( - // "Sent msg packet. seq: %F, msg seq: %F, data: \"%F\"", - // FmtSint(header.seq), - // FmtSint(header.msg_seq), - // FmtString(packet->data) - // ); + u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff)); + CopyBytes(buff, &checksum, sizeof(checksum)); + sendto( + pipe->udp, + (char *)buff, + buff_len, + 0, + (struct sockaddr *)&addr, + sizeof(addr) + ); + Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len); + + // LogDebugF( + // "Sent msg packet. seq: %F, msg seq: %F, data: \"%F\"", + // FmtSint(header.seq), + // FmtSint(header.msg_seq), + // FmtString(packet->data) + // ); + } } } + packet = next; } - packet = next; } } } diff --git a/src/net/net_win32/net_win32.h b/src/net/net_win32/net_win32.h index 20e3bb04..1a0ad128 100644 --- a/src/net/net_win32/net_win32.h +++ b/src/net/net_win32/net_win32.h @@ -119,6 +119,18 @@ Struct(NET_W32_PeerBin) Struct(NET_W32_Pipe) { + //- IOCP data + + OVERLAPPED ovl; // First field must be OVERLAPPED for cast from IOCP returns + + i32 iocp_count; // How many overlapped operations are currently queued. + + WSABUF recv_wsabuff; + struct sockaddr_in6 recv_addr; + i32 recv_addr_sz; + + SOCKET udp; + //- Shared data NET_W32_Pipe *next; @@ -139,6 +151,8 @@ Struct(NET_W32_Pipe) //- Worker data + i64 last_attempted_bind_ns; + NET_W32_Peer *first_peer; NET_W32_Peer *last_peer; i64 num_msg_packets_received_this_frame; @@ -147,7 +161,6 @@ Struct(NET_W32_Pipe) NET_W32_PeerBin *peer_bins; u64 bound_port; - SOCKET udp; }; //////////////////////////////////////////////////////////// @@ -170,8 +183,7 @@ Struct(NET_W32_Ctx) NET_W32_Peer *first_free_peer; NET_W32_Packet *first_free_packet; - NET_W32_DummySocket wake_send_sock; - NET_W32_DummySocket wake_recv_sock; + HANDLE iocp; }; extern NET_W32_Ctx NET_W32; @@ -186,6 +198,7 @@ struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key); void NET_W32_SignalWorker(void); NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key); u32 NET_W32_ChecksumFromPacketString(String str); +void NET_W32_PostRecv(NET_W32_Pipe *pipe); //////////////////////////////////////////////////////////// //~ Worker diff --git a/src/pp/pp_vis/pp_vis_core.c b/src/pp/pp_vis/pp_vis_core.c index 28f1662b..9b8a852e 100644 --- a/src/pp/pp_vis/pp_vis_core.c +++ b/src/pp/pp_vis/pp_vis_core.c @@ -2651,6 +2651,8 @@ void V_TickForever(WaveLaneCtx *lane) { // V_PushNotif(Lit("Hello!!!")); P_Msg *chat_msg = P_PushMsg(P_MsgKind_Chat, Lit("Hello!!!")); + + } break; } } @@ -2670,7 +2672,6 @@ void V_TickForever(WaveLaneCtx *lane) P_MsgList in_msgs = Zi; { - NET_Bind(net_pipe, 0); NET_MsgList net_msgs = NET_Swap(frame->arena, net_pipe); for (NET_Msg *net_msg = net_msgs.first; net_msg; net_msg = net_msg->next) { @@ -3847,7 +3848,8 @@ void V_TickForever(WaveLaneCtx *lane) if (frame->held_buttons[Button_R] && !prev_frame->held_buttons[Button_R]) { LogDebugF("Sending test payload"); - NET_Push(net_pipe, server_key, STRING(P_TilesCount, predict_world->tiles), 0); + // NET_Push(net_pipe, server_key, STRING(P_TilesCount, predict_world->tiles), 0); + NET_Push(net_pipe, server_key, Lit("Hello there!"), 0); }