From 9912e0bfdd04dd923fd4e4ce0e6b63c588ee7160 Mon Sep 17 00:00:00 2001 From: jacob Date: Thu, 15 Jan 2026 23:22:11 -0600 Subject: [PATCH] working networking --- src/base/base_memory.h | 14 + src/net/net.h | 15 +- src/net/net_win32/net_win32.c | 951 +++++++++++++++++++++------------- src/net/net_win32/net_win32.h | 146 +++++- src/pp/pp.h | 2 +- src/pp/pp_sim/pp_sim_core.c | 8 +- src/pp/pp_vis/pp_vis_core.c | 23 +- 7 files changed, 772 insertions(+), 387 deletions(-) diff --git a/src/base/base_memory.h b/src/base/base_memory.h index 2630352b..dad57feb 100644 --- a/src/base/base_memory.h +++ b/src/base/base_memory.h @@ -16,7 +16,20 @@ void SetMemoryReadWrite(void *address, u64 size); //////////////////////////////////////////////////////////// //~ Memory ops +Inline b32 MatchBytesZero(u8 *v, u64 size) +{ + for (u64 idx = 0; idx < size; ++idx) + { + if (v[idx] != 0) + { + return 0; + } + } + return 1; +} + //- Wrappers +#define MatchStructZero(ptr) MatchBytesZero((u8 *)(ptr), sizeof(*ptr)) #define MatchBytes(p1, p2, n) (CmpBytes((p1), (p2), (n)) == 0) #define MatchStruct(p1, p2) MatchBytes((p1), (p2), sizeof(*p1)) #define ZeroBytes(ptr, count) SetBytes((ptr), 0, (count)) @@ -26,6 +39,7 @@ void SetMemoryReadWrite(void *address, u64 size); #define CopyStructs(ptr_dst, ptr_src, n) CopyBytes((ptr_dst), (ptr_src), sizeof(*(ptr_dst)) * (n)) #define CopyStruct(ptr_dst, ptr_src) CopyStructs((ptr_dst), (ptr_src), 1) +//- Stubs #define CopyBytes(dst, src, count) memcpy(dst, src, count) #define SetBytes(dst, c, count) memset(dst, c, count) #define CmpBytes(p1, p2, count) memcmp(p1, p2, count) diff --git a/src/net/net.h b/src/net/net.h index 7d526900..459c32f0 100644 --- a/src/net/net.h +++ b/src/net/net.h @@ -1,3 +1,6 @@ +// NOTE: Burst messages with lengths exceeding packet size will degrade +// into sequenced messages + #define NET_PacketSize 1024 //////////////////////////////////////////////////////////// @@ -12,11 +15,11 @@ Struct(NET_PipeHandle) { u64 v; }; Struct(NET_Key) { - u64 v; + u8 addr[32]; }; #define NET_NilKey ((NET_Key) { 0 }) -#define NET_IsKeyNil(k) ((k).v == 0) +#define NET_IsKeyNil(k) (MatchStructZero(&k)) //////////////////////////////////////////////////////////// //~ Message types @@ -26,7 +29,7 @@ Struct(NET_Msg) NET_Msg *next; NET_Msg *prev; - NET_Key src; + NET_Key sender; String data; }; @@ -48,7 +51,7 @@ void NET_Bootstrap(void); NET_PipeHandle NET_AcquirePipe(void); void NET_Bind(NET_PipeHandle pipe, u64 port); -u64 NET_BoundPortFromPipe(NET_PipeHandle pipe_handle); +NET_Key NET_KeyFromString(String host, String port); -void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 unreliable); -NET_MsgList NET_Pop(Arena *arena, NET_PipeHandle pipe); +NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe); +void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst); diff --git a/src/net/net_win32/net_win32.c b/src/net/net_win32/net_win32.c index 41df0cbd..81557e6e 100644 --- a/src/net/net_win32/net_win32.c +++ b/src/net/net_win32/net_win32.c @@ -23,6 +23,72 @@ NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle) return (NET_W32_Pipe *)pipe_handle.v; } +NET_Key NET_W32_KeyFromAddress(struct sockaddr_in6 addr) +{ + NET_Key result = Zi; + CopyBytes(&result, &addr, sizeof(addr)); + return result; +} + +struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key) +{ + struct sockaddr_in6 result = Zi; + CopyBytes(&result, &key, sizeof(result)); + return result; +} + +u64 NET_W32_HashFromKey(NET_Key key) +{ + u64 result = HashString(StringFromStruct(&key)); + return result; +} + +void NET_W32_SignalWorker(void) +{ + // TODO +} + +NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key) +{ + // TODO: Address challenge on first receive + NET_W32_Peer *peer = 0; + u64 hash = NET_W32_HashFromKey(key); + NET_W32_PeerBin *bin = &pipe->peer_bins[hash % pipe->peer_bins_count]; + peer = bin->first; + for (; peer; peer = peer->next_in_bin) + { + if (peer->hash == hash) + { + break; + } + } + if (!peer) + { + peer = NET_W32.first_free_peer; + if (peer) + { + SllStackPop(NET_W32.first_free_peer); + { + String old_msg_fragment = peer->fragment; + ZeroStruct(peer); + peer->fragment = old_msg_fragment; + peer->fragment.len = 0; + } + } + else + { + Arena *perm = PermArena(); + peer = PushStruct(perm, NET_W32_Peer); + peer->fragment.text = PushStructsNoZero(perm, u8, NET_PacketSize); + } + peer->hash = hash; + peer->key = key; + DllQueuePush(pipe->first_peer, pipe->last_peer, peer); + DllQueuePushNP(bin->first, bin->last, peer, next_in_bin, prev_in_bin); + } + return peer; +} + //////////////////////////////////////////////////////////// //~ @hookimpl Net ops @@ -30,189 +96,116 @@ NET_PipeHandle NET_AcquirePipe(void) { Arena *perm = PermArena(); NET_W32_Pipe *pipe = PushStruct(perm, NET_W32_Pipe); + { + for (i64 idx = 0; idx < countof(pipe->cmd_buffs); ++idx) + { + NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[idx]; + cmd_buff->arena = AcquireArena(Gibi(64)); + } + for (i64 idx = 0; idx < countof(pipe->msg_buffs); ++idx) + { + NET_W32_MsgBuff *msg_buff = &pipe->msg_buffs[idx]; + msg_buff->arena = AcquireArena(Gibi(64)); + } + pipe->peer_bins_count = Kibi(1); + pipe->peer_bins = PushStructs(perm, NET_W32_PeerBin, pipe->peer_bins_count); + } + { + LockTicketMutex(&NET_W32.pipes_tm); + { + ++NET_W32.pipes_count; + DllQueuePush(NET_W32.first_pipe, NET_W32.last_pipe, pipe); + } + UnlockTicketMutex(&NET_W32.pipes_tm); + } return (NET_PipeHandle) { .v = (u64) pipe }; } void NET_Bind(NET_PipeHandle pipe_handle, u64 port) { - TempArena scratch = BeginScratchNoConflict(); NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); - - b32 is_ephemeral = port == 0; - // FIXME: Retry on timeout - if (!pipe->udp || (!is_ephemeral && pipe->bound_port != port)) + if (port == 0) { - b32 ok = 1; - String port_str = StringF(scratch.arena, "%F", FmtUint(port)); - char *port_cstr = CstrFromString(scratch.arena, port_str); + Atomic64Set(&pipe->desired_port, 0xFFFFFFFFFFFFFFFFull); + } + else + { + Atomic64Set(&pipe->desired_port, port); + } +} - if (pipe->udp) - { - closesocket(pipe->udp); - pipe->bound_port = 0; - pipe->udp = 0; - } - - //- Init bind address +NET_Key NET_KeyFromString(String host, String port) +{ + NET_Key result = Zi; + TempArena scratch = BeginScratchNoConflict(); + { struct addrinfo hints = Zi; hints.ai_family = AF_INET6; hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol= IPPROTO_UDP; - hints.ai_flags = AI_PASSIVE; - struct addrinfo *ai = 0; - if (ok) - { - ok = getaddrinfo(0, port_cstr, &hints, &ai) == 0; - } + hints.ai_protocol = IPPROTO_UDP; + hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG; - //- Create udp socket - SOCKET sock = 0; - if (ok) - { - sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); - ok = sock != INVALID_SOCKET; - } + char *host_cstr = CstrFromString(scratch.arena, host); + char *port_cstr = CstrFromString(scratch.arena, port); - //- Enable address reuse - if (ok) + struct sockaddr_in6 addr = Zi; { - b32 reuse = 1; - ok = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == 0; - } - - //- Enable dual stack - if (ok) - { - DWORD v6_only = 0; - ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0; - } - - //- Set buffer sizes - if (ok) - { - i32 rcvbuf_min = Mebi(1); - i32 sndbuf_min = Mebi(1); + struct addrinfo *first_ai = 0; + if (getaddrinfo(host_cstr, port_cstr, &hints, &first_ai) == 0) { - i32 rcvbuf = 0; - i32 rcvbuf_sz = sizeof(rcvbuf); - if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0) + for (struct addrinfo *ai = first_ai; ai; ai = ai->ai_next) { - if (rcvbuf < rcvbuf_min) + if (ai->ai_family == AF_INET6 && ai->ai_addrlen >= sizeof(addr)) { - setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min)); + CopyBytes(&addr, ai->ai_addr, sizeof(addr)); + break; } } } + if (first_ai) { - i32 sndbuf = 0; - i32 sndbuf_sz = sizeof(sndbuf); - if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0) - { - if (sndbuf < sndbuf_min) - { - setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min)); - } - } + freeaddrinfo(first_ai); } } - - //- Bind - if (ok) - { - ok = bind(sock, ai->ai_addr, (i32)ai->ai_addrlen) == 0; - } - - //- Enable non-blocking - if (ok) - { - u_long nonblocking = 1; - ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0; - } - - //- Fetch bound port - u64 bound_port = 0; - { - struct sockaddr_storage ss = Zi; - if (ok) - { - i32 ss_sizeof = sizeof(ss); - ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR; - } - if (ok) - { - if (ss.ss_family == AF_INET) - { - struct sockaddr_in *a = (struct sockaddr_in *)&ss; - bound_port = ntohs(a->sin_port); - } - else if (ss.ss_family == AF_INET6) - { - struct sockaddr_in6 *a6 = (struct sockaddr_in6 *)&ss; - bound_port = ntohs(a6->sin6_port); - } - else - { - ok = 0; - } - } - } - - //- Finalize - if (ok) - { - pipe->bound_port = bound_port; - pipe->udp = sock; - } - else - { - if (sock != INVALID_SOCKET) - { - closesocket(sock); - } - } - - if (ai) - { - freeaddrinfo(ai); - } + result = NET_W32_KeyFromAddress(addr); } - EndScratch(scratch); + return result; } -u64 NET_BoundPortFromPipe(NET_PipeHandle pipe_handle) +NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle) { - // TODO: Instead maybe return "BindingStatus", which includes port + binding error/progress NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); - return pipe->bound_port; + NET_W32_MsgBuff *msg_buff = 0; + { + LockTicketMutex(&pipe->back_msg_buff_seq_tm); + { + msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)]; + pipe->back_msg_buff_seq += 1; + NET_W32_MsgBuff *back_msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)]; + ResetArena(back_msg_buff->arena); + ZeroStruct(&back_msg_buff->msgs); + } + UnlockTicketMutex(&pipe->back_msg_buff_seq_tm); + } + return msg_buff->msgs; } -NET_Key NET_KeyFromString(String str) +void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst) { - NET_Key result = Zi; - return result; -} - -void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 unreliable) -{ - // NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); - - // if (!pipe->udp) - // { - // // Bind to ephemeral port if not bound - // NET_Bind(pipe_handle, 0); - // } - - // if (pipe->udp) - // { - - // } -} - -NET_MsgList NET_Pop(Arena *arena, NET_PipeHandle pipe_handle) -{ - NET_MsgList result = Zi; - return result; + NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); + LockTicketMutex(&pipe->back_cmd_buff_seq_tm); + { + NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)]; + NET_W32_Cmd *cmd = PushStruct(cmd_buff->arena, NET_W32_Cmd); + cmd->key = dst; + cmd->data = PushString(cmd_buff->arena, data); + cmd->burst = burst; + SllQueuePush(cmd_buff->cmds.first, cmd_buff->cmds.last, cmd); + ++cmd_buff->cmds.count; + } + UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm); + NET_W32_SignalWorker(); } //////////////////////////////////////////////////////////// @@ -222,61 +215,197 @@ void NET_W32_TickForever(WaveLaneCtx *lane) { Arena *perm = PermArena(); - NET_W32_Pipe *first_pipe = 0; - NET_W32_Pipe *last_pipe = 0; - - NET_W32_Peer *first_free_peer = 0; - - Enum(PacketFlag) - { - PacketFlag_None, - }; - - Struct(PacketHeader) - { - u32 magic; - PacketFlag flags; - i64 seq; - i64 msg_seq; - i64 bottom_ack; - u64 ack_bits; - }; - for (;;) { TempArena scratch = BeginScratchNoConflict(); u32 magic = 0xde8c590b; + i64 heartbeat_threshold_ns = NsFromSeconds(0.250); // TODO: Block until send/recv/signal - for (NET_W32_Pipe *pipe = first_pipe; pipe; pipe = pipe->next) + ////////////////////////////// + //- Pop pipes + + i64 pipes_count = 0; + NET_W32_Pipe **pipes = 0; { - ////////////////////////////// - //- Pop cmds + LockTicketMutex(&NET_W32.pipes_tm); + { + pipes = PushStructsNoZero(scratch.arena, NET_W32_Pipe *, NET_W32.pipes_count); + for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next) + { + pipes[pipes_count] = pipe; + pipes_count += 1; + } + } + UnlockTicketMutex(&NET_W32.pipes_tm); + } - NET_W32_CmdList pipe_cmds = Zi; + for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx) + { + NET_W32_Pipe *pipe = pipes[pipe_idx]; ////////////////////////////// - //- Assemble writes + //- Reset peer data - // for (NET_W32_Cmd *cmd = pipe_cmds.first; cmd; cmd = cmd->next) - // { - // if (pipe->udp) - // { - // NET_Key key = cmd->dst_key; - // NET_W32_Routing routing = NET_W32_RoutingFromKey(key); - // struct sockaddr_in6 *addr = &routing->addr; + for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) + { + peer->num_msg_packets_received_this_frame = 0; + peer->num_msg_packets_sent_this_frame = 0; + } - // sendto( - // pipe->udp, - // packet.text, - // packet.len, - // 0, - // (struct sockaddr *)addr, - // sizeof(*addr) - // ); - // } - // } + ////////////////////////////// + //- Bind + + { + u64 desired_port = Atomic64Fetch(&pipe->desired_port); + if (desired_port != 0) + { + b32 is_ephemeral = desired_port >= Kibi(64); + if (is_ephemeral) + { + desired_port = 0; + } + + // FIXME: Retry on timeout + if (!pipe->udp || (!is_ephemeral && pipe->bound_port != desired_port)) + { + b32 ok = 1; + String port_str = StringF(scratch.arena, "%F", FmtUint(desired_port)); + char *port_cstr = CstrFromString(scratch.arena, port_str); + + if (pipe->udp) + { + closesocket(pipe->udp); + pipe->bound_port = 0; + pipe->udp = 0; + } + + //- Init bind address + struct addrinfo hints = Zi; + hints.ai_family = AF_INET6; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol= IPPROTO_UDP; + hints.ai_flags = AI_PASSIVE; + struct addrinfo *ai = 0; + if (ok) + { + ok = getaddrinfo(0, port_cstr, &hints, &ai) == 0; + } + + //- Create udp socket + SOCKET sock = 0; + if (ok) + { + sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + ok = sock != INVALID_SOCKET; + } + + //- Enable address reuse + if (ok) + { + b32 reuse = 1; + ok = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == 0; + } + + //- Enable dual stack + if (ok) + { + DWORD v6_only = 0; + ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0; + } + + //- Set buffer sizes + if (ok) + { + i32 rcvbuf_min = Mebi(1); + i32 sndbuf_min = Mebi(1); + { + i32 rcvbuf = 0; + i32 rcvbuf_sz = sizeof(rcvbuf); + if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0) + { + if (rcvbuf < rcvbuf_min) + { + setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min)); + } + } + } + { + i32 sndbuf = 0; + i32 sndbuf_sz = sizeof(sndbuf); + if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0) + { + if (sndbuf < sndbuf_min) + { + setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min)); + } + } + } + } + + //- Bind + if (ok) + { + ok = bind(sock, ai->ai_addr, (i32)ai->ai_addrlen) == 0; + } + + //- Enable non-blocking + if (ok) + { + u_long nonblocking = 1; + ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0; + } + + //- Fetch bound port + u64 bound_port = 0; + { + struct sockaddr_storage ss = Zi; + if (ok) + { + i32 ss_sizeof = sizeof(ss); + ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR; + } + if (ok) + { + if (ss.ss_family == AF_INET) + { + struct sockaddr_in *a = (struct sockaddr_in *)&ss; + bound_port = ntohs(a->sin_port); + } + else if (ss.ss_family == AF_INET6) + { + struct sockaddr_in6 *a6 = (struct sockaddr_in6 *)&ss; + bound_port = ntohs(a6->sin6_port); + } + else + { + ok = 0; + } + } + } + + //- Finalize + if (ok) + { + pipe->bound_port = bound_port; + pipe->udp = sock; + } + else + { + if (sock != INVALID_SOCKET) + { + closesocket(sock); + } + } + + if (ai) + { + freeaddrinfo(ai); + } + } + } + } ////////////////////////////// //- Read socket @@ -288,67 +417,50 @@ void NET_W32_TickForever(WaveLaneCtx *lane) while (len >= 0) { u8 buff[Kibi(2)]; - struct sockaddr_in6 addr = peer->addr; + struct sockaddr_in6 addr = Zi; i32 addr_sz = sizeof(addr); - len = recvfrom(pipe->udp, buff, countof(buff), 0, (struct sockaddr *)&sockaddr_in6, &addr_sz); + len = recvfrom(pipe->udp, (char *)buff, countof(buff), 0, (struct sockaddr *)&addr, &addr_sz); if ( - len >= sizeof(PacketHeader) && - len <= sizeof(PacketHeader) + NET_PacketSize && + len >= sizeof(NET_W32_PacketHeader) && + len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize && MatchBytes(buff, &magic, sizeof(magic)) ) { NET_Key key = NET_W32_KeyFromAddress(addr); - PacketHeader header = Zi; + NET_W32_PacketHeader header = Zi; CopyBytes(&header, buff, sizeof(header)); + String src_data = Zi; + src_data.text = buff + sizeof(header); + src_data.len = len - sizeof(header); + + // if (!(header.flags & NET_W32_PacketFlag_Heartbeat)) + // { + // LogDebugF( + // "Received msg. packet seq: %F, msg seq: %F, data: \"%F\"", + // FmtSint(header.seq), + // FmtSint(header.msg_seq), + // FmtString(src_data) + // ); + // } //- Fetch peer - // TODO: Address challenge on first receive - NET_W32_Peer *peer = 0; - { - u64 hash = NET_HashFromKey(key); - NET_W32_PeerBin *bin = &pipe->peer_bins[hash % pipe->peer_bins_count]; - NET_W32_Peer *peer = bin->first; - for (; peer; peer = peer->next_in_bin) - { - if (NET_MatchKey(peer->key, key)) - { - break; - } - } - if (!peer) - { - peer = first_free_peer; - if (peer) - { - SllStackPop(first_free_peer); - ZeroStruct(peer); - } - else - { - peer = PushStruct(perm, NET_W32_Peer); - } - peer->key = key; - DllQueuePush(pipe->first_peer, pipe->last_peer, pipe); - DllQueuePushNP(bin->first, bin->last, pipe, next_in_bin, prev_in_bin); - } - } + NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key); + peer->key = key; //- Read packet { - // Update remote acks + // Update remote stats + if (header.bottom_ack == peer->remote_bottom_ack) { - if (header.bottom_ack == peer->remote_bottom_ack) - { - peer->remote_ack_bits |= header.ack_bits; - } - else if (header.bottom_ack > peer->remote_bottom_ack) - { - peer->remote_bottom_ack = header.bottom_ack; - peer->remote_ack_bits = header.ack_bits; - } + peer->remote_ack_bits |= header.ack_bits; + } + else if (header.bottom_ack > peer->remote_bottom_ack) + { + peer->remote_bottom_ack = header.bottom_ack; + peer->remote_ack_bits = header.ack_bits; } - // Update our acks + // Update our stats b32 is_sequential = 0; b32 should_process = 0; if (header.seq == peer->bottom_ack + 1) @@ -360,17 +472,23 @@ void NET_W32_TickForever(WaveLaneCtx *lane) } else if (header.seq > peer->bottom_ack + 1 && header.seq < peer->bottom_ack + 65) { - u64 ack_bit = 1 << (header.seq - 2 - peer->bottom_ack); + u64 ack_bit = (u64)1 << (header.seq - 2 - peer->bottom_ack); should_process = !!(peer->ack_bits & ack_bit); peer->ack_bits |= ack_bit; } + if (!(header.flags & NET_W32_PacketFlag_Heartbeat)) + { + peer->num_msg_packets_received_this_frame += 1; + } + peer->last_packet_received_ns = TimeNs(); // Process packet if (should_process) { - NET_W32_Packet *packet = first_free_packet; + NET_W32_Packet *packet = NET_W32.first_free_packet; if (packet) { + SllStackPop(NET_W32.first_free_packet); String old_packet_data = packet->data; ZeroStruct(packet); packet->data = old_packet_data; @@ -384,42 +502,40 @@ void NET_W32_TickForever(WaveLaneCtx *lane) packet->flags = header.flags; packet->seq = header.seq; packet->msg_seq = header.msg_seq; + CopyBytes(packet->data.text, src_data.text, src_data.len); + packet->data.len = src_data.len; // Insert packet - // TODO: Ring buffer - // if (is_sequential) - // { - // DllQueuePush(peer->first_msg_packet, peer->last_msg_packet, packet); - // for (NET_W32_Packet *tmp = peer->first_packet; tmp;) - // { - // NET_W32_Packet *next = tmp->next; - // if (tmp->seq == peer->last_msg_packet->seq + 1) - // { - // DllQueueRemove(peer->first_packet, - // } - // else - // { - // break; - // } - // tmp = next; - // } - // } - // else - // { { - NET_W32_Packet *left = peer->last_packet; + NET_W32_Packet *left = peer->last_fragmented_packet; for (; left; left = left->prev) { - if (left < packet->seq) + if (left->seq < packet->seq) { break; } } - DllQueueInsert(peer->first_unsequential_packet, peer->last_unsequential_packet, left, packet); + DllQueueInsert(peer->first_fragmented_packet, peer->last_fragmented_packet, left, packet); } + // Transfer fragmented -> contiguous packets if (is_sequential) { + if (peer->last_contiguous_packet) + { + peer->last_contiguous_packet->next = peer->first_fragmented_packet; + } + else + { + peer->first_contiguous_packet = peer->first_fragmented_packet; + } + peer->first_fragmented_packet->prev = peer->last_contiguous_packet; + peer->last_contiguous_packet = packet; + peer->first_fragmented_packet = packet->next; + if (peer->last_fragmented_packet == packet) + { + peer->last_fragmented_packet = 0; + } } } } @@ -428,24 +544,144 @@ void NET_W32_TickForever(WaveLaneCtx *lane) } ////////////////////////////// - //- Assemble messages + //- Assemble read messages + + // TODO: Maximum message size { - for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) + LockTicketMutex(&pipe->back_msg_buff_seq_tm); { - for (NET_W32_Packet *packet = pipe->first_packet; packet;) + NET_W32_MsgBuff *msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)]; + + String msg_data = Zi; + msg_data.text = ArenaNext(msg_buff->arena, u8); + + for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) { - NET_W32_Packet *next = packet->next; + NET_W32_Packet *first_msg_packet = peer->first_contiguous_packet; + for (NET_W32_Packet *packet = peer->first_contiguous_packet; packet;) { - if (packet-> + NET_W32_Packet *next = packet->next; + { + b32 msg_has_end = 0; + if (packet->msg_seq != peer->last_contiguous_packet->msg_seq || !!(peer->last_contiguous_packet->flags | NET_W32_PacketFlag_EndMsg)) + { + msg_has_end = 1; + } + if (msg_has_end) + { + msg_data.len += packet->data.len; + PushString(msg_buff->arena, packet->data); + b32 is_end_of_msg = !!(packet->flags & NET_W32_PacketFlag_EndMsg) || (next && next->msg_seq != packet->msg_seq); + if (is_end_of_msg) + { + NET_Msg *msg = PushStruct(msg_buff->arena, NET_Msg); + msg->sender = peer->key; + msg->data = msg_data; + DllQueuePush(msg_buff->msgs.first, msg_buff->msgs.last, msg); + ++msg_buff->msgs.count; + { + peer->first_contiguous_packet = next; + if (next) + { + next->prev = 0; + } + else + { + peer->last_contiguous_packet = 0; + } + packet->next = NET_W32.first_free_packet; + NET_W32.first_free_packet = first_msg_packet; + } + first_msg_packet = next; + msg_data.text = ArenaNext(msg_buff->arena, u8); + msg_data.len = 0; + } + } + else + { + break; + } + } + packet = next; + } + } + } + UnlockTicketMutex(&pipe->back_msg_buff_seq_tm); + } + + ////////////////////////////// + //- Queue message packets + + { + // Swap cmd buff + NET_W32_CmdBuff *cmd_buff = 0; + { + LockTicketMutex(&pipe->back_cmd_buff_seq_tm); + { + cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)]; + pipe->back_cmd_buff_seq += 1; + NET_W32_CmdBuff *back_cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)]; + ResetArena(back_cmd_buff->arena); + ZeroStruct(&back_cmd_buff->cmds); + } + UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm); + } + + for (NET_W32_Cmd *cmd = cmd_buff->cmds.first; cmd; cmd = cmd->next) + { + NET_Key key = cmd->key; + NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key); + String src_data = cmd->data; + i64 src_pos = 0; + i64 msg_seq = ++peer->msg_seq; + + // TODO: Burst packets + + while (src_pos < (i64)src_data.len) + { + i64 copy_len = MinI64(NET_PacketSize - (i64)peer->fragment.len, src_data.len - src_pos); + CopyBytes(peer->fragment.text + peer->fragment.len, src_data.text + src_pos, copy_len); + src_pos += copy_len; + peer->fragment.len += copy_len; + + // Push packet + b32 is_msg_end = src_pos >= (i64)src_data.len; + if (peer->fragment.len == NET_PacketSize || is_msg_end) + { + NET_W32_Packet *packet = NET_W32.first_free_packet; + if (packet) + { + SllStackPop(NET_W32.first_free_packet); + String old_packet_data = packet->data; + ZeroStruct(packet); + packet->data = old_packet_data; + packet->data.len = 0; + } + else + { + packet = PushStruct(perm, NET_W32_Packet); + packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize); + } + packet->seq = ++peer->seq; + packet->msg_seq = msg_seq; + CopyBytes(packet->data.text, peer->fragment.text, peer->fragment.len); + packet->data.len = peer->fragment.len; + + if (is_msg_end) + { + packet->flags |= NET_W32_PacketFlag_EndMsg; + } + + DllQueuePush(peer->first_remote_packet, peer->last_remote_packet, packet); + peer->fragment.len = 0; } - packet = next; } } } ////////////////////////////// - //- Write peers + //- Send message packets // TODO: Rate limit @@ -457,7 +693,8 @@ void NET_W32_TickForever(WaveLaneCtx *lane) i64 bottom_ack = peer->remote_bottom_ack; u64 ack_bits = peer->remote_ack_bits; - struct sockaddr_in6 addr = peer->addr; + NET_Key key = peer->key; + struct sockaddr_in6 addr = NET_W32_AddressFromKey(key); // TODO: Maybe send more than just bottom_ack + 65 packets at a time. for (NET_W32_Packet *packet = peer->first_remote_packet; packet && packet->seq < bottom_ack + 65;) @@ -473,18 +710,21 @@ void NET_W32_TickForever(WaveLaneCtx *lane) } else if (seq > bottom_ack + 1 && packet->seq < bottom_ack + 65) { - u64 ack_bit = 1 << (seq - 2 - bottom_ack); + u64 ack_bit = (u64)1 << (seq - 2 - bottom_ack); is_acked = !!(ack_bits & ack_bit); } } if (is_acked) { // Prune acked packet - DllQueueRemove(peer->first_remote_packet, peer->last_packet, packet); - SllStackPush(first_free_packet, packet); + DllQueueRemove(peer->first_remote_packet, peer->last_remote_packet, packet); + SllStackPush(NET_W32.first_free_packet, packet); } else { + peer->num_msg_packets_sent_this_frame += 1; + peer->last_packet_sent_ns = TimeNs(); + // Transmit unacked packet // FIXME: Rate limit, don't send if we've already sent in the last second. // NOTE: If we do this we should probably put the net worker on something like a 1-second auto-run timer @@ -492,22 +732,24 @@ void NET_W32_TickForever(WaveLaneCtx *lane) i64 buff_len = 0; u8 buff[Kibi(2)]; - // { - // i64 seq = packet->seq; - // i64 top_ack = 0; - // i64 ack_bits = 0; - // CopyBytes(buff + buff_len, &magic, 4); - // buff_len += 4; - // CopyBytes(buff + buff_len, &seq, 8); - // buff_len += 8; - // CopyBytes(buff + buff_len, &top_bits, 8); - // buff_len += 8; - // CopyBytes(buff + buff_len, &ack_bits, 8); - // } + NET_W32_PacketHeader header = Zi; + { + header.magic = magic; + header.flags = packet->flags; + header.seq = packet->seq; + header.msg_seq = packet->msg_seq; + header.bottom_ack = peer->bottom_ack; + header.ack_bits = peer->ack_bits; + } + CopyBytes(buff, &header, sizeof(header)); + buff_len += sizeof(header); + + CopyBytes(buff + buff_len, packet->data.text, packet->data.len); + buff_len += packet->data.len; sendto( pipe->udp, - buff, + (char *)buff, buff_len, 0, (struct sockaddr *)&addr, @@ -518,92 +760,57 @@ void NET_W32_TickForever(WaveLaneCtx *lane) packet = next; } } + + ////////////////////////////// + //- Send heartbeats + + for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) + { + b32 should_send_heartbeat = 0; + + // If we received a message from a peer this frame but did not send any + // back, we should send a heartbeat packet so that the peer can still + // receive up to date ack info + if (peer->num_msg_packets_received_this_frame > 0 && peer->num_msg_packets_sent_this_frame == 0) + { + should_send_heartbeat = 1; + } + + // Send heartbeat if we haven't sent any packets in a while + i64 now_ns = TimeNs(); + if (now_ns - peer->last_packet_sent_ns > heartbeat_threshold_ns) + { + should_send_heartbeat = 1; + } + + if (should_send_heartbeat) + { + struct sockaddr_in6 addr = NET_W32_AddressFromKey(peer->key); + + i64 buff_len = 0; + u8 buff[Kibi(2)]; + NET_W32_PacketHeader header = Zi; + { + header.magic = magic; + header.flags = NET_W32_PacketFlag_Heartbeat; + header.bottom_ack = peer->bottom_ack; + header.ack_bits = peer->ack_bits; + } + CopyBytes(buff, &header, sizeof(header)); + buff_len += sizeof(header); + sendto( + pipe->udp, + (char *)buff, + buff_len, + 0, + (struct sockaddr *)&addr, + sizeof(addr) + ); + peer->last_packet_sent_ns = now_ns; + } + } } - - - - - - - - - - - - - - - - - - - - - - - - - - ////////////////////////////// - //- Pop cmds - - // NET_W32_Cmd *first_cmd = 0; - // NET_W32_Cmd *last_cmd = 0; - - ////////////////////////////// - //- Create pipes from cmds - - // for (NET_W32_Cmd *cmd = first_cmd; cmd; cmd = cmd->next) - // { - // NET_Key key = cmd->dst_key; - // NET_W32_Routing routing = NET_W32_RoutingFromKey(key); - // u64 port = routing.recv_port; - - // struct sockaddr *addr = NET_W32_SockAddressFromKey(key); - // } - - - // NET_W32_Cmd *first_cmd = 0; - // NET_W32_Cmd *last_cmd = 0; - // { - // Lock lock = LockE(&NET_W32.pipes_mutex); - // { - - // } - // Unlock(&lock); - // } - - ////////////////////////////// - //- Receive messages - - ////////////////////////////// - //- Send messages - - // { - // for (NET_W32_Pipe *pipe = first_pipe; pipe; pipe = pipe->next) - // { - // if (pipe->udp) - // { - // sendto( - // pipe->udp, - // packet.text, - // packet.len, - - // ); - // } - // } - // } - - - - - - - - - - EndScratch(scratch); } } diff --git a/src/net/net_win32/net_win32.h b/src/net/net_win32/net_win32.h index bf822ff7..317ce124 100644 --- a/src/net/net_win32/net_win32.h +++ b/src/net/net_win32/net_win32.h @@ -1,8 +1,141 @@ +//////////////////////////////////////////////////////////// +//~ Command buff types + +Struct(NET_W32_Cmd) +{ + NET_W32_Cmd *next; + + NET_Key key; + String data; + b32 burst; +}; + +Struct(NET_W32_CmdList) +{ + i64 count; + NET_W32_Cmd *first; + NET_W32_Cmd *last; +}; + +Struct(NET_W32_CmdBuff) +{ + Arena *arena; + NET_W32_CmdList cmds; +}; + +//////////////////////////////////////////////////////////// +//~ Message buff types + +Struct(NET_W32_MsgBuff) +{ + Arena *arena; + NET_MsgList msgs; +}; + +//////////////////////////////////////////////////////////// +//~ Packet types + +Enum(NET_W32_PacketFlag) +{ + NET_W32_PacketFlag_None = 0, + + NET_W32_PacketFlag_EndMsg = (1 << 0), + NET_W32_PacketFlag_Heartbeat = (1 << 2), + // NET_W32_PacketFlag_Burst = (1 << 3), +}; + +Struct(NET_W32_PacketHeader) +{ + u32 magic; + NET_W32_PacketFlag flags; + i64 seq; + i64 msg_seq; + i64 bottom_ack; + u64 ack_bits; +}; + +Struct(NET_W32_Packet) +{ + NET_W32_Packet *next; + NET_W32_Packet *prev; + + NET_W32_PacketFlag flags; + i64 seq; + i64 msg_seq; + String data; +}; + +//////////////////////////////////////////////////////////// +//~ Peer types + +Struct(NET_W32_Peer) +{ + NET_W32_Peer *next; + NET_W32_Peer *prev; + NET_W32_Peer *next_in_bin; + NET_W32_Peer *prev_in_bin; + + u64 hash; + NET_Key key; + + i64 remote_bottom_ack; + i64 remote_ack_bits; + i64 bottom_ack; + i64 ack_bits; + + NET_W32_Packet *first_remote_packet; + NET_W32_Packet *last_remote_packet; + + NET_W32_Packet *first_fragmented_packet; + NET_W32_Packet *last_fragmented_packet; + + NET_W32_Packet *first_contiguous_packet; + NET_W32_Packet *last_contiguous_packet; + + i64 seq; + i64 msg_seq; + String fragment; + + i64 last_packet_received_ns; + i64 last_packet_sent_ns; + i64 num_msg_packets_received_this_frame; + i64 num_msg_packets_sent_this_frame; +}; + +Struct(NET_W32_PeerBin) +{ + NET_W32_Peer *first; + NET_W32_Peer *last; +}; + //////////////////////////////////////////////////////////// //~ Pipe types Struct(NET_W32_Pipe) { + //- Shared data + + NET_W32_Pipe *next; + NET_W32_Pipe *prev; + + Atomic64 desired_port; // >64k means ephemeral + + TicketMutex back_cmd_buff_seq_tm; + i64 back_cmd_buff_seq; + NET_W32_CmdBuff cmd_buffs[2]; + + TicketMutex back_msg_buff_seq_tm; + i64 back_msg_buff_seq; + NET_W32_MsgBuff msg_buffs[2]; + + //- Worker data + + NET_W32_Peer *first_peer; + NET_W32_Peer *last_peer; + + i64 peer_bins_count; + NET_W32_PeerBin *peer_bins; + u64 bound_port; SOCKET udp; }; @@ -12,7 +145,13 @@ Struct(NET_W32_Pipe) Struct(NET_W32_Ctx) { - i32 _; + TicketMutex pipes_tm; + i64 pipes_count; + NET_W32_Pipe *first_pipe; + NET_W32_Pipe *last_pipe; + + NET_W32_Peer *first_free_peer; + NET_W32_Packet *first_free_packet; }; extern NET_W32_Ctx NET_W32; @@ -21,6 +160,11 @@ extern NET_W32_Ctx NET_W32; //~ Helpers NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle); +NET_Key NET_W32_KeyFromAddress(struct sockaddr_in6 addr); +struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key); +u64 NET_W32_HashFromKey(NET_Key key); +void NET_W32_SignalWorker(void); +NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key); //////////////////////////////////////////////////////////// //~ Worker diff --git a/src/pp/pp.h b/src/pp/pp.h index 6f8539cc..4ce7b8bf 100644 --- a/src/pp/pp.h +++ b/src/pp/pp.h @@ -327,7 +327,7 @@ Struct(P_Msg) //- In P_Key dst_user; // If dst is 0, then cmd is meant to be broadcast - b32 unreliable; + b32 burst; //- Out diff --git a/src/pp/pp_sim/pp_sim_core.c b/src/pp/pp_sim/pp_sim_core.c index 20f77d93..8060597f 100644 --- a/src/pp/pp_sim/pp_sim_core.c +++ b/src/pp/pp_sim/pp_sim_core.c @@ -130,7 +130,7 @@ void S_TickForever(WaveLaneCtx *lane) P_MsgList in_msgs = Zi; { - NET_MsgList net_msgs = NET_Pop(frame_arena, net_pipe); + NET_MsgList net_msgs = NET_Swap(frame_arena, net_pipe); for (NET_Msg *net_msg = net_msgs.first; net_msg; net_msg = net_msg->next) { String packed = net_msg->data; @@ -430,7 +430,7 @@ void S_TickForever(WaveLaneCtx *lane) // Arena *msgs_arena = P_tl.out_msgs_arena; // P_Msg *msg = P_PushMsg(P_MsgKind_SimSnapshot, Zstr); // msg->dst_user = user->key; - // msg->unreliable = 1; + // msg->burst = 1; // P_SimSnapshot *snapshot = &msg->sim_snapshot; // { // snapshot->world_seed = world->seed; @@ -486,7 +486,7 @@ void S_TickForever(WaveLaneCtx *lane) { if (user->is_user) { - NET_Push(net_pipe, user->net, packed, msg->unreliable); + NET_Push(net_pipe, user->net, packed, msg->burst); } } } @@ -496,7 +496,7 @@ void S_TickForever(WaveLaneCtx *lane) if (!NET_IsKeyNil(user->net)) { String packed = P_PackMessage(frame_arena, msg); - NET_Push(net_pipe, user->net, packed, msg->unreliable); + NET_Push(net_pipe, user->net, packed, msg->burst); } } } diff --git a/src/pp/pp_vis/pp_vis_core.c b/src/pp/pp_vis/pp_vis_core.c index 107c1081..1cfb8c1e 100644 --- a/src/pp/pp_vis/pp_vis_core.c +++ b/src/pp/pp_vis/pp_vis_core.c @@ -346,7 +346,6 @@ void V_TickForever(WaveLaneCtx *lane) const f32 meters_per_draw_width = 18; NET_PipeHandle net_pipe = NET_AcquirePipe(); - NET_Bind(net_pipe, 0); ////////////////////////////// //- Init vis state @@ -2630,7 +2629,8 @@ void V_TickForever(WaveLaneCtx *lane) P_MsgList in_msgs = Zi; { - NET_MsgList net_msgs = NET_Pop(frame->arena, net_pipe); + NET_Bind(net_pipe, 0); + NET_MsgList net_msgs = NET_Swap(frame->arena, net_pipe); for (NET_Msg *net_msg = net_msgs.first; net_msg; net_msg = net_msg->next) { String packed = net_msg->data; @@ -2644,6 +2644,12 @@ void V_TickForever(WaveLaneCtx *lane) } } + + + + + + ////////////////////////////// //- Pop snapshot from sim @@ -3747,11 +3753,22 @@ void V_TickForever(WaveLaneCtx *lane) ////////////////////////////// //- Send messages + + + + // FIXME: Remove this (testing) + + NET_Key server_key = NET_KeyFromString(Lit("127.0.0.1"), Lit("22121")); + NET_Push(net_pipe, server_key, Lit("HELLO!!!!"), 0); + + + + for (P_MsgNode *msg_node = P_tl.out_msgs.first; msg_node; msg_node = msg_node->next) { P_Msg *msg = &msg_node->msg; String packed = P_PackMessage(frame->arena, msg); - NET_Push(net_pipe, user->net, packed, msg->unreliable); + NET_Push(net_pipe, server_key, packed, msg->burst); } //////////////////////////////