From 9f0ea6e45be6527f022b65358381caab02088879 Mon Sep 17 00:00:00 2001 From: jacob Date: Thu, 15 Jan 2026 17:51:50 -0600 Subject: [PATCH] networking progress --- src/net/net.h | 2 +- src/net/net_win32/net_win32.c | 319 +++++++++++++++++++++++++++++++--- src/net/net_win32/net_win32.h | 2 +- src/pp/pp_sim/pp_sim_core.c | 9 +- src/pp/pp_vis/pp_vis_core.c | 5 +- 5 files changed, 307 insertions(+), 30 deletions(-) diff --git a/src/net/net.h b/src/net/net.h index fe397c39..85002a4c 100644 --- a/src/net/net.h +++ b/src/net/net.h @@ -48,5 +48,5 @@ NET_PipeHandle NET_AcquirePipe(void); void NET_Bind(NET_PipeHandle pipe, u64 port); u64 NET_BoundPortFromPipe(NET_PipeHandle pipe_handle); -void NET_Push(NET_PipeHandle pipe, NET_Key dst, String data, b32 unreliable); +void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 unreliable); NET_MsgList NET_Pop(Arena *arena, NET_PipeHandle pipe); diff --git a/src/net/net_win32/net_win32.c b/src/net/net_win32/net_win32.c index 5d981350..ceebb531 100644 --- a/src/net/net_win32/net_win32.c +++ b/src/net/net_win32/net_win32.c @@ -35,14 +35,6 @@ NET_PipeHandle NET_AcquirePipe(void) void NET_Bind(NET_PipeHandle pipe_handle, u64 port) { - // TODO: Mabye remove binding from the net interface entirely? - // Messages can just be popped from specific ports (with port 0 - // always returning no messages). Ephemeral ports can be "fetched", - // which under the hood binds them. A caching/timeout mechanism then - // just closes sockets as needed. - - - TempArena scratch = BeginScratchNoConflict(); NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); @@ -95,6 +87,35 @@ void NET_Bind(NET_PipeHandle pipe_handle, u64 port) ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0; } + //- Set buffer sizes + if (ok) + { + i32 rcvbuf_min = Mebi(1); + i32 sndbuf_min = Mebi(1); + { + i32 rcvbuf = 0; + i32 rcvbuf_sz = sizeof(rcvbuf); + if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0) + { + if (rcvbuf < rcvbuf_min) + { + setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min)); + } + } + } + { + i32 sndbuf = 0; + i32 sndbuf_sz = sizeof(sndbuf); + if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0) + { + if (sndbuf < sndbuf_min) + { + setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min)); + } + } + } + } + //- Bind if (ok) { @@ -166,20 +187,26 @@ u64 NET_BoundPortFromPipe(NET_PipeHandle pipe_handle) return pipe->bound_port; } +NET_Key NET_KeyFromString(String str) +{ + NET_Key result = Zi; + return result; +} + void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 unreliable) { - NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); + // NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); - if (!pipe->udp) - { - // Bind to ephemeral port if not bound - NET_Bind(pipe_handle, 0); - } + // if (!pipe->udp) + // { + // // Bind to ephemeral port if not bound + // NET_Bind(pipe_handle, 0); + // } - if (pipe->udp) - { + // if (pipe->udp) + // { - } + // } } NET_MsgList NET_Pop(Arena *arena, NET_PipeHandle pipe_handle) @@ -189,25 +216,273 @@ NET_MsgList NET_Pop(Arena *arena, NET_PipeHandle pipe_handle) } //////////////////////////////////////////////////////////// -//~ Net worker +//~ Worker void NET_W32_TickForever(WaveLaneCtx *lane) { + Arena *perm = PermArena(); + + NET_W32_Pipe *first_pipe = 0; + NET_W32_Pipe *last_pipe = 0; + + NET_W32_Peer *first_free_peer = 0; + + Enum(PacketFlag) + { + PacketFlag_None, + }; + + Struct(PacketHeader) + { + u32 magic; + PacketFlag flags; + i64 seq; + i64 bottom_ack; + u64 ack_bits; + }; + for (;;) { TempArena scratch = BeginScratchNoConflict(); + u32 magic = 0xde8c590b; // TODO: Block until send/recv/signal - ////////////////////////////// - //- Pop pipes + for (NET_W32_Pipe *pipe = first_pipe; pipe; pipe = pipe->next) + { + ////////////////////////////// + //- Pop cmds + + NET_W32_CmdList pipe_cmds = Zi; + + ////////////////////////////// + //- Assemble writes + + // for (NET_W32_Cmd *cmd = pipe_cmds.first; cmd; cmd = cmd->next) + // { + // if (pipe->udp) + // { + // NET_Key key = cmd->dst_key; + // NET_W32_Routing routing = NET_W32_RoutingFromKey(key); + // struct sockaddr_in6 *addr = &routing->addr; + + // sendto( + // pipe->udp, + // packet.text, + // packet.len, + // 0, + // (struct sockaddr *)addr, + // sizeof(*addr) + // ); + // } + // } + + ////////////////////////////// + //- Read socket + + { + i32 len = 0; + while (len >= 0) + { + u8 buff[Kibi(2)]; + struct sockaddr_in6 addr = peer->addr; + i32 addr_sz = sizeof(addr); + len = recvfrom(pipe->udp, buff, countof(buff), 0, (struct sockaddr *)&sockaddr_in6, &addr_sz); + if (len >= sizeof(PacketHeader) && MatchBytes(buff, &magic, sizeof(magic))) + { + NET_Key key = NET_W32_KeyFromAddress(addr); + PacketHeader header = Zi; + CopyBytes(&header, buff, sizeof(header)); + + //- Fetch peer + // TODO: Address challenge on first receive + NET_W32_Peer *peer = 0; + { + u64 hash = NET_HashFromKey(key); + NET_W32_PeerBin *bin = &pipe->peer_bins[hash % pipe->peer_bins_count]; + NET_W32_Peer *peer = bin->first; + for (; peer; peer = peer->next_in_bin) + { + if (NET_MatchKey(peer->key, key)) + { + break; + } + } + if (!peer) + { + peer = first_free_peer; + if (peer) + { + SllStackPop(first_free_peer); + ZeroStruct(peer); + } + else + { + peer = PushStruct(perm, NET_W32_Peer); + } + peer->key = key; + DllQueueInsert(pipe->first_peer, pipe->last_peer, pipe); + DllQueueInsertNP(bin->first, bin->last, pipe, next_in_bin, prev_in_bin); + } + } + + //- Read packet + { + // Update remote acks + { + if (header.bottom_ack == peer->remote_bottom_ack) + { + peer->remote_ack_bits |= header.ack_bits; + } + else if (header.bottom_ack > peer->remote_bottom_ack) + { + peer->remote_bottom_ack = header.bottom_ack; + peer->remote_ack_bits = header.ack_bits; + } + } + + // Update our acks + b32 should_process = 0; + if (header.seq == peer->bottom_ack + 1) + { + should_process = 1; + peer->bottom_ack = header.seq; + peer->ack_bits >>= 1; + } + if (header.seq > peer->bottom_ack + 1 && header.seq < peer->bottom_ack + 65) + { + u64 ack_bit = 1 << (header.seq - 2 - peer->bottom_ack); + should_process = !!(peer->ack_bits & ack_bit); + peer->ack_bits |= ack_bit; + } + + // Process packet + if (should_process) + { + // NET_W32_Packet *packet = + } + } + } + } + } + + ////////////////////////////// + //- Write peers + + for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next) + { + // bottom_ack represents the highest continuous sequence acknowledgement, meaning sequences in range [0, bottom_ack] are always acked. + // This means bottom_ack + 1 is never acked. + // Ack bits represent acks for sequences in range [bottom_ack + 2, bottom_ack + 65] + i64 bottom_ack = peer->remote_bottom_ack; + u64 ack_bits = peer->remote_ack_bits; + + struct sockaddr_in6 addr = peer->addr; + + for (NET_W32_Packet *packet = peer->first_packet; packet && packet->seq < bottom_ack + 65;) + { + NET_W32_Packet *next = packet->next; + { + i64 seq = packet->seq; + b32 is_acked = 0; + { + if (seq <= bottom_ack) + { + is_acked = 1; + } + else if (seq > bottom_ack + 1) + { + u64 ack_bit = 1 << (seq - 2 - bottom_ack); + is_acked = !!(ack_bits & ack_bit); + } + } + if (is_acked) + { + // Prune acked packet + DllQueueRemove(peer->first_packet, peer->last_packet, packet); + SllStackPush(first_free_packet, packet); + } + else + { + // Transmit unacked packet + // FIXME: Rate limit, don't send if we've already sent in the last second. + // NOTE: If we do this we should probably put the net worker on something like a 1-second auto-run timer + // TODO: crc32 + i64 buff_len = 0; + u8 buff[Kibi(2)]; + + // { + // i64 seq = packet->seq; + // i64 top_ack = 0; + // i64 ack_bits = 0; + // CopyBytes(buff + buff_len, &magic, 4); + // buff_len += 4; + // CopyBytes(buff + buff_len, &seq, 8); + // buff_len += 8; + // CopyBytes(buff + buff_len, &top_bits, 8); + // buff_len += 8; + // CopyBytes(buff + buff_len, &ack_bits, 8); + // } + + sendto( + pipe->udp, + buff, + buff_len, + 0, + (struct sockaddr *)&addr, + sizeof(addr) + ); + } + } + packet = next; + } + } + } + + + + + + + + + + + + + + + + + + + + + + + + - NET_W32_Pipe *first_pipe = 0; - NET_W32_Pipe *last_pipe = 0; ////////////////////////////// //- Pop cmds + // NET_W32_Cmd *first_cmd = 0; + // NET_W32_Cmd *last_cmd = 0; + + ////////////////////////////// + //- Create pipes from cmds + + // for (NET_W32_Cmd *cmd = first_cmd; cmd; cmd = cmd->next) + // { + // NET_Key key = cmd->dst_key; + // NET_W32_Routing routing = NET_W32_RoutingFromKey(key); + // u64 port = routing.recv_port; + + // struct sockaddr *addr = NET_W32_SockAddressFromKey(key); + // } + + // NET_W32_Cmd *first_cmd = 0; // NET_W32_Cmd *last_cmd = 0; // { diff --git a/src/net/net_win32/net_win32.h b/src/net/net_win32/net_win32.h index 2e7249d5..bf822ff7 100644 --- a/src/net/net_win32/net_win32.h +++ b/src/net/net_win32/net_win32.h @@ -23,6 +23,6 @@ extern NET_W32_Ctx NET_W32; NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle); //////////////////////////////////////////////////////////// -//~ Net worker +//~ Worker void NET_W32_TickForever(WaveLaneCtx *lane); diff --git a/src/pp/pp_sim/pp_sim_core.c b/src/pp/pp_sim/pp_sim_core.c index 7b114ff2..20f77d93 100644 --- a/src/pp/pp_sim/pp_sim_core.c +++ b/src/pp/pp_sim/pp_sim_core.c @@ -25,13 +25,13 @@ void S_TickForever(WaveLaneCtx *lane) P_tl.debug_arena = AcquireArena(Gibi(64)); P_tl.out_msgs_arena = AcquireArena(Gibi(64)); - NET_PipeHandle net_pipe = NET_AcquirePipe(); - P_World *world = P_AcquireWorld(); // TODO: Real per-client deltas b32 has_sent_initial_tick = 0; + NET_PipeHandle net_pipe = NET_AcquirePipe(); + @@ -124,8 +124,9 @@ void S_TickForever(WaveLaneCtx *lane) ////////////////////////////// //- Pop messages - u64 desired_port = 22121; - NET_Bind(net_pipe, desired_port); + + u64 port = 22121; + NET_Bind(net_pipe, port); P_MsgList in_msgs = Zi; { diff --git a/src/pp/pp_vis/pp_vis_core.c b/src/pp/pp_vis/pp_vis_core.c index a576bdf1..107c1081 100644 --- a/src/pp/pp_vis/pp_vis_core.c +++ b/src/pp/pp_vis/pp_vis_core.c @@ -345,6 +345,9 @@ void V_TickForever(WaveLaneCtx *lane) const f32 max_zoom = 15.0; const f32 meters_per_draw_width = 18; + NET_PipeHandle net_pipe = NET_AcquirePipe(); + NET_Bind(net_pipe, 0); + ////////////////////////////// //- Init vis state @@ -359,8 +362,6 @@ void V_TickForever(WaveLaneCtx *lane) Vec2I32 tiles_dims = VEC2I32(P_TilesPitch, P_TilesPitch); Vec2I32 cells_dims = VEC2I32(V_CellsPerMeter * P_WorldPitch, V_CellsPerMeter * P_WorldPitch); - NET_PipeHandle net_pipe = NET_AcquirePipe(); - // Init gpu state G_ResourceHandle gpu_state = Zi; G_ResourceHandle gpu_tiles = Zi;