diff --git a/src/net/net_win32/net_win32.c b/src/net/net_win32/net_win32.c index 16b404c1..6029bb69 100644 --- a/src/net/net_win32/net_win32.c +++ b/src/net/net_win32/net_win32.c @@ -137,6 +137,38 @@ void NET_W32_PostRecv(NET_W32_Pipe *pipe) ); } +void NET_W32_SendTo(NET_W32_Pipe *pipe, NET_W32_Peer *peer, NET_W32_PacketHeader header, String data) +{ + struct sockaddr_in6 addr = NET_W32_AddressFromKey(peer->key); + i64 buff_len = 0; + u8 buff[Kibi(2)]; + + // Push header + header.checksum = 0; + CopyBytes(buff, &header, sizeof(header)); + buff_len += sizeof(header); + + // Push data + i64 data_write_len = MinI64(countof(buff) - buff_len, data.len); + CopyBytes(buff + buff_len, data.text, data_write_len); + buff_len += data_write_len; + + // Write checksum + u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff)); + CopyBytes(buff, &checksum, sizeof(checksum)); + + // Send + sendto( + pipe->udp, + (char *)buff, + buff_len, + 0, + (struct sockaddr *)&addr, + sizeof(addr) + ); + Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len); +} + //////////////////////////////////////////////////////////// //~ @hookimpl Net ops @@ -279,23 +311,27 @@ NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle) void NET_Send(NET_PipeHandle pipe_handle, NET_Key dst, String data, NET_SendFlag flags) { - NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); - if (Atomic64Fetch(&pipe->desired_port) == 0) + b32 drop = AnyBit(flags, NET_SendFlag_Raw) && data.len > NET_PacketSize; + if (!drop) { - Atomic64Set(&pipe->desired_port, NET_Ephemeral); + NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle); + if (Atomic64Fetch(&pipe->desired_port) == 0) + { + Atomic64Set(&pipe->desired_port, NET_Ephemeral); + } + LockTicketMutex(&pipe->back_cmd_buff_seq_tm); + { + NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)]; + NET_W32_Cmd *cmd = PushStruct(cmd_buff->arena, NET_W32_Cmd); + cmd->key = dst; + cmd->data = PushString(cmd_buff->arena, data); + cmd->send_flags = flags; + SllQueuePush(cmd_buff->cmds.first, cmd_buff->cmds.last, cmd); + ++cmd_buff->cmds.count; + } + UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm); + NET_W32_SignalWorker(); } - LockTicketMutex(&pipe->back_cmd_buff_seq_tm); - { - NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)]; - NET_W32_Cmd *cmd = PushStruct(cmd_buff->arena, NET_W32_Cmd); - cmd->key = dst; - cmd->data = PushString(cmd_buff->arena, data); - cmd->send_flags = flags; - SllQueuePush(cmd_buff->cmds.first, cmd_buff->cmds.last, cmd); - ++cmd_buff->cmds.count; - } - UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm); - NET_W32_SignalWorker(); } NET_PipeStatistics NET_StatsFromPipe(NET_PipeHandle pipe_handle) @@ -391,6 +427,7 @@ void NET_W32_TickForever(WaveLaneCtx *lane) wake_count += 1; Atomic64Set(&NET_W32.seen_signal, Atomic64Fetch(&NET_W32.signal)); } + i64 time_ns = TimeNs(); i32 err = GetLastError(); if (!ok) { @@ -426,7 +463,9 @@ void NET_W32_TickForever(WaveLaneCtx *lane) //- Fetch peer NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key); peer->key = key; - peer->last_packet_received_ns = TimeNs(); + + //- Update remote stats + peer->last_ack_received_ns = time_ns; if (header.bottom_ack == peer->remote_bottom_ack) { peer->remote_ack_bits |= header.ack_bits; @@ -437,123 +476,144 @@ void NET_W32_TickForever(WaveLaneCtx *lane) peer->remote_ack_bits = header.ack_bits; } + b32 is_heartbeat = !!(header.flags & NET_W32_PacketFlag_Heartbeat); + b32 is_raw = !!(header.flags & NET_W32_PacketFlag_Raw); + b32 is_reliable = !is_heartbeat && !is_raw; + //- Read msg packet - if (!(header.flags & NET_W32_PacketFlag_Heartbeat)) + if (!is_heartbeat) { - // Update our stats - b32 is_sequential = 0; - b32 should_process = 0; - if (header.seq == peer->bottom_ack + 1) + if (is_raw) { - is_sequential = 1; - should_process = 1; - } - else if (header.seq > peer->bottom_ack + 1 && header.seq <= peer->bottom_ack + 65) - { - u64 ack_bit = (u64)1 << (header.seq - peer->bottom_ack - 2); - should_process = !(peer->ack_bits & ack_bit); - peer->ack_bits |= ack_bit; - } - peer->num_msg_packets_received_this_frame += 1; - pipe->num_msg_packets_received_this_frame += 1; - - // Process packet - if (should_process) - { - NET_W32_Packet *packet = NET_W32.first_free_packet; - if (packet) + //- Read raw packet now + LockTicketMutex(&pipe->back_msg_buff_seq_tm); { - SllStackPop(NET_W32.first_free_packet); - String old_packet_data = packet->data; - ZeroStruct(packet); - packet->data = old_packet_data; - packet->data.len = 0; + NET_W32_MsgBuff *msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)]; + NET_Msg *msg = PushStruct(msg_buff->arena, NET_Msg); + msg->sender = peer->key; + msg->data = PushString(msg_buff->arena, src_data); + DllQueuePush(msg_buff->msgs.first, msg_buff->msgs.last, msg); + ++msg_buff->msgs.count; } - else + UnlockTicketMutex(&pipe->back_msg_buff_seq_tm); + } + else if (is_reliable) + { + //- Update our stats + b32 is_sequential = 0; + b32 should_process = 0; + if (header.seq == peer->bottom_ack + 1) { - packet = PushStruct(perm, NET_W32_Packet); - packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize); + is_sequential = 1; + should_process = 1; } - packet->flags = header.flags; - packet->seq = header.seq; - packet->msg_seq = header.msg_seq; - CopyBytes(packet->data.text, src_data.text, src_data.len); - packet->data.len = src_data.len; - - // Insert packet + else if (header.seq > peer->bottom_ack + 1 && header.seq <= peer->bottom_ack + 65) { - NET_W32_Packet *left = peer->last_frag_packet; - for (; left; left = left->prev) + u64 ack_bit = (u64)1 << (header.seq - peer->bottom_ack - 2); + should_process = !(peer->ack_bits & ack_bit); + peer->ack_bits |= ack_bit; + } + peer->num_msg_packets_received_this_frame += 1; + pipe->num_msg_packets_received_this_frame += 1; + + //- Process packet + if (should_process) + { + NET_W32_Packet *packet = NET_W32.first_free_packet; + if (packet) { - if (left->seq < packet->seq) - { - break; - } + SllStackPop(NET_W32.first_free_packet); + String old_packet_data = packet->data; + ZeroStruct(packet); + packet->data = old_packet_data; + packet->data.len = 0; } - DllQueueInsert(peer->first_frag_packet, peer->last_frag_packet, left, packet); - } - - // Transfer fragmented -> contiguous packets - if (is_sequential) - { - NET_W32_Packet *contig_start = peer->first_frag_packet; - NET_W32_Packet *contig_end = packet; - for (NET_W32_Packet *tmp = packet->next; tmp; tmp = tmp->next) + else { - if (tmp->seq == tmp->prev->seq + 1) + packet = PushStruct(perm, NET_W32_Packet); + packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize); + } + packet->flags = header.flags; + packet->seq = header.seq; + packet->msg_seq = header.msg_seq; + CopyBytes(packet->data.text, src_data.text, src_data.len); + packet->data.len = src_data.len; + + //- Insert packet + { + NET_W32_Packet *left = peer->last_frag_packet; + for (; left; left = left->prev) { - contig_end = tmp; + if (left->seq < packet->seq) + { + break; + } + } + DllQueueInsert(peer->first_frag_packet, peer->last_frag_packet, left, packet); + } + + //- Transfer fragmented -> contiguous packets + if (is_sequential) + { + NET_W32_Packet *contig_start = peer->first_frag_packet; + NET_W32_Packet *contig_end = packet; + for (NET_W32_Packet *tmp = packet->next; tmp; tmp = tmp->next) + { + if (tmp->seq == tmp->prev->seq + 1) + { + contig_end = tmp; + } + else + { + break; + } + } + + if (peer->last_cont_packet) + { + peer->last_cont_packet->next = contig_start; } else { - break; + peer->first_cont_packet = contig_start; } - } + contig_start->prev = peer->last_cont_packet; + peer->last_cont_packet = contig_end; - if (peer->last_cont_packet) - { - peer->last_cont_packet->next = contig_start; - } - else - { - peer->first_cont_packet = contig_start; - } - contig_start->prev = peer->last_cont_packet; - peer->last_cont_packet = contig_end; + if (contig_end->next) + { + contig_end->next->prev = 0; + } + else + { + peer->last_frag_packet = 0; + } + peer->first_frag_packet = contig_end->next; + contig_end->next = 0; - if (contig_end->next) - { - contig_end->next->prev = 0; + i64 diff = contig_end->seq - peer->bottom_ack; + if (diff <= 63) + { + peer->ack_bits >>= diff; + } + else + { + peer->ack_bits = 0; + } + peer->bottom_ack = contig_end->seq; } - else - { - peer->last_frag_packet = 0; - } - peer->first_frag_packet = contig_end->next; - contig_end->next = 0; - - i64 diff = contig_end->seq - peer->bottom_ack; - if (diff <= 63) - { - peer->ack_bits >>= diff; - } - else - { - peer->ack_bits = 0; - } - peer->bottom_ack = contig_end->seq; } - } - LogTraceF( - "Received msg packet. seq: %F, msg seq: %F, new bottom ack: %F, new ack bits: %F, should_process: %F, data: \"%F\"", - FmtSint(header.seq), - FmtSint(header.msg_seq), - FmtSint(peer->bottom_ack), - FmtUint(peer->ack_bits), - FmtUint(should_process), - FmtString(src_data) - ); + LogTraceF( + "Received msg packet. seq: %F, msg seq: %F, new bottom ack: %F, new ack bits: %F, should_process: %F, data: \"%F\"", + FmtSint(header.seq), + FmtSint(header.msg_seq), + FmtSint(peer->bottom_ack), + FmtUint(peer->ack_bits), + FmtUint(should_process), + FmtString(src_data) + ); + } } } } @@ -574,13 +634,12 @@ void NET_W32_TickForever(WaveLaneCtx *lane) for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx) { NET_W32_Pipe *pipe = pipes[pipe_idx]; + i64 time_ns = TimeNs(); ////////////////////////////// //- Bind { - i64 now_ns = TimeNs(); - u64 desired_port = Atomic64Fetch(&pipe->desired_port); if (desired_port != 0) { @@ -592,7 +651,7 @@ void NET_W32_TickForever(WaveLaneCtx *lane) if ( (!pipe->udp || (!desired_is_ephemeral && pipe->bound_port != desired_port)) && - (pipe->last_attempted_bind_ns == 0 || now_ns - pipe->last_attempted_bind_ns > bind_retry_threshold_ns) + (pipe->last_attempted_bind_ns == 0 || time_ns - pipe->last_attempted_bind_ns > bind_retry_threshold_ns) ) { LogTraceF("Binding to port \"%F\"", FmtUint(desired_port)); @@ -611,7 +670,7 @@ void NET_W32_TickForever(WaveLaneCtx *lane) if (pipe->iocp_count == 0) { - pipe->last_attempted_bind_ns = now_ns; + pipe->last_attempted_bind_ns = time_ns; //- Init bind address struct addrinfo hints = Zi; @@ -848,7 +907,7 @@ void NET_W32_TickForever(WaveLaneCtx *lane) } ////////////////////////////// - //- Queue message packets + //- Process cmds { // Swap cmd buff @@ -869,50 +928,66 @@ void NET_W32_TickForever(WaveLaneCtx *lane) { NET_Key key = cmd->key; NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key); - String src_data = cmd->data; - i64 src_pos = 0; - i64 msg_seq = ++peer->msg_seq; - LogTraceF("Queued msg with seq %F", FmtSint(msg_seq)); - // TODO: Raw packets + b32 is_raw = !!(cmd->send_flags & NET_SendFlag_Raw); - b32 is_msg_end = 0; - while (!is_msg_end) + if (is_raw) { - i64 copy_len = MinI64(NET_PacketSize - (i64)peer->msg_fragment.len, src_data.len - src_pos); - CopyBytes(peer->msg_fragment.text + peer->msg_fragment.len, src_data.text + src_pos, copy_len); - src_pos += copy_len; - peer->msg_fragment.len += copy_len; - - // Push packet - is_msg_end = src_pos >= (i64)src_data.len; - if (peer->msg_fragment.len == NET_PacketSize || is_msg_end) + // Send raw packet now + NET_W32_PacketHeader header = Zi; { - NET_W32_Packet *packet = NET_W32.first_free_packet; - if (packet) - { - SllStackPop(NET_W32.first_free_packet); - String old_packet_data = packet->data; - ZeroStruct(packet); - packet->data = old_packet_data; - packet->data.len = 0; - } - else - { - packet = PushStruct(perm, NET_W32_Packet); - packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize); - } - packet->seq = ++peer->seq; - packet->msg_seq = msg_seq; - CopyBytes(packet->data.text, peer->msg_fragment.text, peer->msg_fragment.len); - packet->data.len = peer->msg_fragment.len; - if (is_msg_end) - { - packet->flags |= NET_W32_PacketFlag_EndMsg; - } + header.flags = NET_W32_PacketFlag_Raw; + header.bottom_ack = peer->bottom_ack; + header.ack_bits = peer->ack_bits; + } + NET_W32_SendTo(pipe, peer, header, cmd->data); + peer->last_ack_sent_ns = time_ns; + } + else + { + // Queue reliable msg packets + String src_data = cmd->data; + i64 src_pos = 0; + i64 msg_seq = ++peer->msg_seq; + LogTraceF("Queued msg with seq %F", FmtSint(msg_seq)); + b32 is_msg_end = 0; + while (!is_msg_end) + { + i64 copy_len = MinI64(NET_PacketSize - (i64)peer->msg_fragment.len, src_data.len - src_pos); + CopyBytes(peer->msg_fragment.text + peer->msg_fragment.len, src_data.text + src_pos, copy_len); + src_pos += copy_len; + peer->msg_fragment.len += copy_len; - DllQueuePush(peer->first_remote_packet, peer->last_remote_packet, packet); - peer->msg_fragment.len = 0; + // Push packet + is_msg_end = src_pos >= (i64)src_data.len; + if (peer->msg_fragment.len == NET_PacketSize || is_msg_end) + { + NET_W32_Packet *packet = NET_W32.first_free_packet; + if (packet) + { + SllStackPop(NET_W32.first_free_packet); + String old_packet_data = packet->data; + ZeroStruct(packet); + packet->data = old_packet_data; + packet->data.len = 0; + } + else + { + packet = PushStruct(perm, NET_W32_Packet); + packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize); + } + packet->seq = ++peer->seq; + packet->msg_seq = msg_seq; + CopyBytes(packet->data.text, peer->msg_fragment.text, peer->msg_fragment.len); + packet->data.len = peer->msg_fragment.len; + if (is_msg_end) + { + packet->flags |= NET_W32_PacketFlag_EndMsg; + } + + DllQueuePush(peer->first_remote_packet, peer->last_remote_packet, packet); + peer->msg_fragment.len = 0; + } } } } @@ -934,39 +1009,21 @@ void NET_W32_TickForever(WaveLaneCtx *lane) } // Send heartbeat if we haven't sent any packets in a while - i64 now_ns = TimeNs(); - if (now_ns - peer->last_packet_sent_ns > heartbeat_threshold_ns) + if (time_ns - peer->last_ack_sent_ns > heartbeat_threshold_ns) { should_send_heartbeat = 1; } if (should_send_heartbeat) { - struct sockaddr_in6 addr = NET_W32_AddressFromKey(peer->key); - i64 buff_len = 0; - u8 buff[Kibi(2)]; NET_W32_PacketHeader header = Zi; { - header.checksum = 0; header.flags = NET_W32_PacketFlag_Heartbeat; header.bottom_ack = peer->bottom_ack; header.ack_bits = peer->ack_bits; } - CopyBytes(buff, &header, sizeof(header)); - buff_len += sizeof(header); - - u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff)); - CopyBytes(buff, &checksum, sizeof(checksum)); - sendto( - pipe->udp, - (char *)buff, - buff_len, - 0, - (struct sockaddr *)&addr, - sizeof(addr) - ); - Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len); - peer->last_packet_sent_ns = now_ns; + NET_W32_SendTo(pipe, peer, header, Zstr); + peer->last_ack_sent_ns = time_ns; } } } @@ -987,7 +1044,6 @@ void NET_W32_TickForever(WaveLaneCtx *lane) u64 ack_bits = peer->remote_ack_bits; NET_Key key = peer->key; - struct sockaddr_in6 addr = NET_W32_AddressFromKey(key); // TODO: Maybe send more than just bottom_ack + 65 packets at a time. for (NET_W32_Packet *packet = peer->first_remote_packet; packet && packet->seq <= bottom_ack + 65;) @@ -1016,41 +1072,19 @@ void NET_W32_TickForever(WaveLaneCtx *lane) else { // Transmit unacked packet - i64 now_ns = TimeNs(); - if (packet->last_sent_ns == 0 || (now_ns - packet->last_sent_ns > msg_resend_threshold_ns)) + if (packet->last_sent_ns == 0 || (time_ns - packet->last_sent_ns > msg_resend_threshold_ns)) { - peer->last_packet_sent_ns = now_ns; - packet->last_sent_ns = now_ns; - i64 buff_len = 0; - u8 buff[Kibi(2)]; - NET_W32_PacketHeader header = Zi; { - header.checksum = 0; header.flags = packet->flags; header.seq = packet->seq; header.msg_seq = packet->msg_seq; header.bottom_ack = peer->bottom_ack; header.ack_bits = peer->ack_bits; } - CopyBytes(buff, &header, sizeof(header)); - buff_len += sizeof(header); - - CopyBytes(buff + buff_len, packet->data.text, packet->data.len); - buff_len += packet->data.len; - - u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff)); - CopyBytes(buff, &checksum, sizeof(checksum)); - sendto( - pipe->udp, - (char *)buff, - buff_len, - 0, - (struct sockaddr *)&addr, - sizeof(addr) - ); - Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len); - + NET_W32_SendTo(pipe, peer, header, packet->data); + peer->last_ack_sent_ns = time_ns; + packet->last_sent_ns = time_ns; LogTraceF( "Sent msg packet. seq: %F, msg seq: %F, data: \"%F\"", FmtSint(header.seq), diff --git a/src/net/net_win32/net_win32.h b/src/net/net_win32/net_win32.h index d1e73b46..aab3374c 100644 --- a/src/net/net_win32/net_win32.h +++ b/src/net/net_win32/net_win32.h @@ -43,7 +43,7 @@ Enum(NET_W32_PacketFlag) NET_W32_PacketFlag_EndMsg = (1 << 0), NET_W32_PacketFlag_Heartbeat = (1 << 2), - // NET_W32_PacketFlag_Raw = (1 << 3), + NET_W32_PacketFlag_Raw = (1 << 3), }; Struct(NET_W32_PacketHeader) @@ -103,8 +103,8 @@ Struct(NET_W32_Peer) i64 msg_seq; String msg_fragment; - i64 last_packet_received_ns; - i64 last_packet_sent_ns; + i64 last_ack_received_ns; + i64 last_ack_sent_ns; i64 num_msg_packets_received_this_frame; }; @@ -203,6 +203,7 @@ void NET_W32_SignalWorker(void); NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key); u32 NET_W32_ChecksumFromPacketString(String str); void NET_W32_PostRecv(NET_W32_Pipe *pipe); +void NET_W32_SendTo(NET_W32_Pipe *pipe, NET_W32_Peer *peer, NET_W32_PacketHeader header, String data); //////////////////////////////////////////////////////////// //~ Worker diff --git a/src/pp/pp_sim/pp_sim_core.c b/src/pp/pp_sim/pp_sim_core.c index afb26e93..579a578e 100644 --- a/src/pp/pp_sim/pp_sim_core.c +++ b/src/pp/pp_sim/pp_sim_core.c @@ -925,8 +925,8 @@ void S_TickForever(WaveLaneCtx *lane) NET_Send(net_pipe, client->net_key, snapshot, NET_SendFlag_Raw); // Sanity check to catch whenever we end up packing too much information - // into a single raw snapshot, causing it to drop. Not an actual error, - // but signals we may have botched the packing code or need tighter + // into a single raw snapshot, causing it to drop. This isn't necessarily + // an error, but signals we may have botched the packing code or need tighter // compression Assert(snapshot.len <= NET_PacketSize); } diff --git a/src/pp/pp_vis/pp_vis_core.c b/src/pp/pp_vis/pp_vis_core.c index 66168bf0..683a18bc 100644 --- a/src/pp/pp_vis/pp_vis_core.c +++ b/src/pp/pp_vis/pp_vis_core.c @@ -3154,8 +3154,8 @@ void V_TickForever(WaveLaneCtx *lane) NET_Send(net_pipe, frame->sim_key, snapshot, NET_SendFlag_Raw); // Sanity check to catch whenever we end up packing too much information - // into a single raw snapshot, causing it to drop. Not an actual error, - // but signals we may have botched the packing code or need tighter + // into a single raw snapshot, causing it to drop. This isn't necessarily + // an error, but signals we may have botched the packing code or need tighter // compression Assert(snapshot.len <= NET_PacketSize); }