power_play/src/net/net_win32/net_win32.c
2026-01-17 16:23:16 -06:00

1042 lines
32 KiB
C

NET_W32_Ctx NET_W32 = Zi;
////////////////////////////////////////////////////////////
//~ @hookimpl Bootstrap
void NET_Bootstrap(void)
{
Arena *perm = PermArena();
// Init winsock
WSADATA wsa = Zi;
i32 err = WSAStartup(MAKEWORD(2,2), &wsa);
if (WSAStartup(MAKEWORD(2,2), &wsa) != 0)
{
Panic(StringF(perm, "Failed to initialize Winsock (error code - %F)", FmtSint(err)));
}
// Init worker wake sockets
NET_W32.wake_send_sock = NET_W32_CreateDummySocket();
NET_W32.wake_recv_sock = NET_W32_CreateDummySocket();
// Start worker
DispatchWave(Lit("Net"), 1, NET_W32_TickForever, 0);
}
////////////////////////////////////////////////////////////
//~ Helpers
NET_W32_DummySocket NET_W32_CreateDummySocket(void)
{
NET_W32_DummySocket result = Zi;
b32 ok = 1;
SOCKET sock = 0;
if (ok)
{
sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
ok = sock != INVALID_SOCKET;
}
if (ok)
{
struct sockaddr_in addr = Zi;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
ok = bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0;
}
struct sockaddr_storage ss = Zi;
i32 ss_sizeof = sizeof(ss);
if (ok)
{
ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR;
}
if (ok)
{
u_long nonblocking = 1;
ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0;
}
if (!ok)
{
if (sock != INVALID_SOCKET)
{
closesocket(sock);
sock = 0;
}
}
result.sock = sock;
result.addr_size = ss_sizeof;
result.addr = ss;
return result;
}
NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle)
{
return (NET_W32_Pipe *)pipe_handle.v;
}
NET_Key NET_W32_KeyFromAddress(struct sockaddr_in6 addr)
{
NET_Key result = Zi;
CopyBytes(&result, &addr, sizeof(addr));
return result;
}
struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key)
{
struct sockaddr_in6 result = Zi;
CopyBytes(&result, &key, sizeof(result));
return result;
}
void NET_W32_SignalWorker(void)
{
i32 err = sendto(
NET_W32.wake_send_sock.sock,
(char *)"1",
1,
0,
(struct sockaddr *)&NET_W32.wake_recv_sock.addr,
NET_W32.wake_send_sock.addr_size
);
}
NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key)
{
// TODO: Address challenge on first receive
NET_W32_Peer *peer = 0;
u64 hash = NET_HashFromKey(key);
NET_W32_PeerBin *bin = &pipe->peer_bins[hash % pipe->peer_bins_count];
peer = bin->first;
for (; peer; peer = peer->next_in_bin)
{
if (peer->hash == hash)
{
break;
}
}
if (!peer)
{
peer = NET_W32.first_free_peer;
if (peer)
{
SllStackPop(NET_W32.first_free_peer);
{
String old_msg_fragment = peer->msg_fragment;
ZeroStruct(peer);
peer->msg_fragment = old_msg_fragment;
peer->msg_fragment.len = 0;
}
}
else
{
Arena *perm = PermArena();
peer = PushStruct(perm, NET_W32_Peer);
peer->msg_fragment.text = PushStructsNoZero(perm, u8, NET_PacketSize);
}
peer->hash = hash;
peer->key = key;
DllQueuePush(pipe->first_peer, pipe->last_peer, peer);
DllQueuePushNP(bin->first, bin->last, peer, next_in_bin, prev_in_bin);
}
return peer;
}
u32 NET_W32_ChecksumFromPacketString(String str)
{
u32 result = 0;
if (str.len > 4)
{
// Skip the first 4 bytes of packet since it contains the checksum itself
str.text += 4;
str.len -= 4;
// https://en.wikipedia.org/wiki/Adler-32
u32 a = 1;
u32 b = 0;
for (u64 idx = 0; idx < str.len; ++idx)
{
a = (a + str.text[idx]) % 65521;
b = (b + a) % 65521;
}
result = (b << 16) | a;
}
return result ^ NET_W32_ProtocolMagic;
}
////////////////////////////////////////////////////////////
//~ @hookimpl Net ops
NET_Key NET_KeyFromString(String host, String port)
{
NET_Key result = Zi;
TempArena scratch = BeginScratchNoConflict();
{
char *host_cstr = CstrFromString(scratch.arena, host);
i64 port_i64 = CR_IntFromString(port);
if (port_i64 < 0 || port_i64 >= Kibi(64))
{
port_i64 = 0;
}
struct sockaddr_in6 addr = Zi;
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port_i64);
if (InetPtonA(AF_INET6, host_cstr, &addr.sin6_addr) != 1)
{
IN_ADDR v4 = Zi;
if (InetPtonA(AF_INET, host_cstr, &v4) == 1)
{
ZeroStruct(&addr.sin6_addr);
addr.sin6_addr.u.Byte[10] = 0xFF;
addr.sin6_addr.u.Byte[11] = 0xFF;
CopyBytes(&addr.sin6_addr.u.Byte[12], &v4, 4);
}
}
result = NET_W32_KeyFromAddress(addr);
}
EndScratch(scratch);
return result;
}
String NET_StringFromKey(Arena *arena, NET_Key key)
{
String result = Zi;
struct sockaddr_in6 sin6 = NET_W32_AddressFromKey(key);
char addr_chars[INET6_ADDRSTRLEN];
u32 port = ntohs(sin6.sin6_port);
if (IN6_IS_ADDR_V4MAPPED(&sin6.sin6_addr))
{
struct in_addr v4 = Zi;
CopyBytes(&v4, &sin6.sin6_addr.s6_addr[12], sizeof(v4));
if (InetNtopA(AF_INET, &v4, addr_chars, (DWORD)sizeof(addr_chars)))
{
result = StringF(arena, "%F:%F", FmtString(StringFromCstr(addr_chars, countof(addr_chars))), FmtUint(port));
}
}
else
{
if (InetNtopA(AF_INET6, &sin6.sin6_addr, addr_chars, (DWORD)sizeof(addr_chars)))
{
result = StringF(arena, "[%F]:%F", FmtString(StringFromCstr(addr_chars, countof(addr_chars))), FmtUint(port));
}
}
return result;
}
b32 NET_MatchKey(NET_Key a, NET_Key b)
{
return MatchStruct(&a, &b);
}
u64 NET_HashFromKey(NET_Key key)
{
u64 result = HashString(StringFromStruct(&key));
return result;
}
NET_PipeHandle NET_AcquirePipe(void)
{
Arena *perm = PermArena();
NET_W32_Pipe *pipe = PushStruct(perm, NET_W32_Pipe);
{
for (i64 idx = 0; idx < countof(pipe->cmd_buffs); ++idx)
{
NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[idx];
cmd_buff->arena = AcquireArena(Gibi(64));
}
for (i64 idx = 0; idx < countof(pipe->msg_buffs); ++idx)
{
NET_W32_MsgBuff *msg_buff = &pipe->msg_buffs[idx];
msg_buff->arena = AcquireArena(Gibi(64));
}
pipe->peer_bins_count = Kibi(1);
pipe->peer_bins = PushStructs(perm, NET_W32_PeerBin, pipe->peer_bins_count);
}
{
LockTicketMutex(&NET_W32.pipes_tm);
{
++NET_W32.pipes_count;
DllQueuePush(NET_W32.first_pipe, NET_W32.last_pipe, pipe);
}
UnlockTicketMutex(&NET_W32.pipes_tm);
}
NET_W32_SignalWorker();
return (NET_PipeHandle) { .v = (u64) pipe };
}
void NET_Bind(NET_PipeHandle pipe_handle, u64 port)
{
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
if (port == 0)
{
Atomic64Set(&pipe->desired_port, 0xFFFFFFFFFFFFFFFFull);
}
else
{
Atomic64Set(&pipe->desired_port, port);
}
// FIXME: Signal here if ports don't match
}
NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle)
{
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
NET_W32_MsgBuff *msg_buff = 0;
{
LockTicketMutex(&pipe->back_msg_buff_seq_tm);
{
msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)];
pipe->back_msg_buff_seq += 1;
NET_W32_MsgBuff *back_msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)];
ResetArena(back_msg_buff->arena);
ZeroStruct(&back_msg_buff->msgs);
}
UnlockTicketMutex(&pipe->back_msg_buff_seq_tm);
}
return msg_buff->msgs;
}
void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst)
{
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
LockTicketMutex(&pipe->back_cmd_buff_seq_tm);
{
NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)];
NET_W32_Cmd *cmd = PushStruct(cmd_buff->arena, NET_W32_Cmd);
cmd->key = dst;
cmd->data = PushString(cmd_buff->arena, data);
cmd->burst = burst;
SllQueuePush(cmd_buff->cmds.first, cmd_buff->cmds.last, cmd);
++cmd_buff->cmds.count;
}
UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm);
NET_W32_SignalWorker();
}
NET_PipeStatistics NET_StatsFromPipe(NET_PipeHandle pipe_handle)
{
NET_PipeStatistics result = Zi;
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
result.total_bytes_sent = Atomic64Fetch(&pipe->total_bytes_sent);
result.total_bytes_received = Atomic64Fetch(&pipe->total_bytes_received);
return result;
}
////////////////////////////////////////////////////////////
//~ Worker
void NET_W32_TickForever(WaveLaneCtx *lane)
{
Arena *perm = PermArena();
i64 seen_signal = 0;
for (;;)
{
TempArena scratch = BeginScratchNoConflict();
i64 heartbeat_threshold_ns = NsFromSeconds(0.250);
i64 passive_run_threshold_ns = NsFromSeconds(0.250);
// TODO: Base this on peer's latency w/ rolling backoff
i64 msg_resend_threshold_ns = NsFromSeconds(0.250);
//////////////////////////////
//- Wait
{
// Build fd list containing every bound pipe's socket + the worker's signal socket
i64 fds_count = 0;
WSAPOLLFD *fds = 0;
{
LockTicketMutex(&NET_W32.pipes_tm);
{
for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next)
{
if (pipe->udp)
{
fds_count += 1;
}
}
if (NET_W32.wake_recv_sock.sock != 0)
{
fds_count += 1;
}
fds = PushStructsNoZero(scratch.arena, WSAPOLLFD, fds_count);
{
i64 fd_idx = 0;
for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next)
{
if (pipe->udp)
{
fds[fd_idx].fd = pipe->udp;
fds[fd_idx].events = POLLRDNORM;
fd_idx += 1;
}
}
if (NET_W32.wake_recv_sock.sock != 0)
{
fds[fd_idx].fd = NET_W32.wake_recv_sock.sock;
fds[fd_idx].events = POLLRDNORM;
fd_idx += 1;
}
}
}
UnlockTicketMutex(&NET_W32.pipes_tm);
}
// Wait
i32 timeout_ms = MsFromNs(passive_run_threshold_ns);
WSAPoll(fds, fds_count, timeout_ms);
// Drain wake recv sock
{
i32 len = 0;
while (len >= 0)
{
u8 buff[Kibi(2)];
len = recv(NET_W32.wake_recv_sock.sock, (char *)buff, countof(buff), 0);
}
}
}
//////////////////////////////
//- Process pipes
i64 pipes_count = 0;
NET_W32_Pipe **pipes = 0;
{
LockTicketMutex(&NET_W32.pipes_tm);
{
pipes = PushStructsNoZero(scratch.arena, NET_W32_Pipe *, NET_W32.pipes_count);
for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next)
{
pipes[pipes_count] = pipe;
pipes_count += 1;
}
}
UnlockTicketMutex(&NET_W32.pipes_tm);
}
for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx)
{
NET_W32_Pipe *pipe = pipes[pipe_idx];
//////////////////////////////
//- Reset frame data
{
pipe->num_msg_packets_received_this_frame = 0;
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
{
peer->num_msg_packets_received_this_frame = 0;
}
}
//////////////////////////////
//- Bind
{
u64 desired_port = Atomic64Fetch(&pipe->desired_port);
if (desired_port != 0)
{
b32 is_ephemeral = desired_port >= Kibi(64);
if (is_ephemeral)
{
desired_port = 0;
}
// FIXME: Retry on timeout
if (!pipe->udp || (!is_ephemeral && pipe->bound_port != desired_port))
{
b32 ok = 1;
String port_str = StringF(scratch.arena, "%F", FmtUint(desired_port));
char *port_cstr = CstrFromString(scratch.arena, port_str);
if (pipe->udp)
{
closesocket(pipe->udp);
pipe->bound_port = 0;
pipe->udp = 0;
}
//- Init bind address
struct addrinfo hints = Zi;
hints.ai_family = AF_INET6;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol= IPPROTO_UDP;
hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
struct addrinfo *ai = 0;
if (ok)
{
ok = getaddrinfo(0, port_cstr, &hints, &ai) == 0;
}
//- Create udp socket
SOCKET sock = 0;
if (ok)
{
sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
ok = sock != INVALID_SOCKET;
}
//- Enable address reuse
if (ok)
{
b32 reuse = 1;
ok = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == 0;
}
//- Enable dual stack
if (ok)
{
DWORD v6_only = 0;
ok = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&v6_only, sizeof(v6_only)) == 0;
}
//- Set buffer sizes
if (ok)
{
i32 rcvbuf_min = Mebi(1);
i32 sndbuf_min = Mebi(1);
{
i32 rcvbuf = 0;
i32 rcvbuf_sz = sizeof(rcvbuf);
if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&rcvbuf, &rcvbuf_sz) == 0)
{
if (rcvbuf < rcvbuf_min)
{
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf_min, sizeof(rcvbuf_min));
}
}
}
{
i32 sndbuf = 0;
i32 sndbuf_sz = sizeof(sndbuf);
if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &sndbuf_sz) == 0)
{
if (sndbuf < sndbuf_min)
{
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_min, sizeof(sndbuf_min));
}
}
}
}
//- Bind
if (ok)
{
ok = bind(sock, ai->ai_addr, (i32)ai->ai_addrlen) == 0;
}
//- Enable non-blocking
if (ok)
{
u_long nonblocking = 1;
ok = ioctlsocket(sock, FIONBIO, &nonblocking) == 0;
}
//- Fetch bound port
u64 bound_port = 0;
{
struct sockaddr_storage ss = Zi;
if (ok)
{
i32 ss_sizeof = sizeof(ss);
ok = getsockname(sock, (struct sockaddr *)&ss, &ss_sizeof) != SOCKET_ERROR;
}
if (ok)
{
if (ss.ss_family == AF_INET)
{
struct sockaddr_in *a = (struct sockaddr_in *)&ss;
bound_port = ntohs(a->sin_port);
}
else if (ss.ss_family == AF_INET6)
{
struct sockaddr_in6 *a6 = (struct sockaddr_in6 *)&ss;
bound_port = ntohs(a6->sin6_port);
}
else
{
ok = 0;
}
}
}
//- Finalize
if (ok)
{
pipe->bound_port = bound_port;
pipe->udp = sock;
}
else
{
if (sock != INVALID_SOCKET)
{
closesocket(sock);
}
}
if (ai)
{
freeaddrinfo(ai);
}
}
}
}
//////////////////////////////
//- Read socket
// TODO: Rate limit
// TODO: Per-frame packet limit to ensure other pipes are still serviced during load
{
i32 len = 0;
while (len >= 0)
{
u8 buff[Kibi(2)];
struct sockaddr_in6 addr = Zi;
i32 addr_sz = sizeof(addr);
len = recvfrom(pipe->udp, (char *)buff, countof(buff), 0, (struct sockaddr *)&addr, &addr_sz);
Atomic64FetchAdd(&pipe->total_bytes_received, len);
if (len >= sizeof(NET_W32_PacketHeader) && len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize)
{
NET_W32_PacketHeader header = Zi;
CopyBytes(&header, buff, sizeof(header));
u32 checksum = NET_W32_ChecksumFromPacketString(STRING(len, buff));
if (header.checksum == checksum)
{
NET_Key key = NET_W32_KeyFromAddress(addr);
String src_data = Zi;
src_data.text = buff + sizeof(header);
src_data.len = len - sizeof(header);
//- Fetch peer
NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key);
peer->key = key;
peer->last_packet_received_ns = TimeNs();
if (header.bottom_ack == peer->remote_bottom_ack)
{
peer->remote_ack_bits |= header.ack_bits;
}
else if (header.bottom_ack > peer->remote_bottom_ack)
{
peer->remote_bottom_ack = header.bottom_ack;
peer->remote_ack_bits = header.ack_bits;
}
//- Read msg packet
if (!(header.flags & NET_W32_PacketFlag_Heartbeat))
{
// Update our stats
b32 is_sequential = 0;
b32 should_process = 0;
if (header.seq == peer->bottom_ack + 1)
{
is_sequential = 1;
should_process = 1;
}
else if (header.seq > peer->bottom_ack + 1 && header.seq <= peer->bottom_ack + 65)
{
u64 ack_bit = (u64)1 << (header.seq - peer->bottom_ack - 2);
should_process = !(peer->ack_bits & ack_bit);
peer->ack_bits |= ack_bit;
}
peer->num_msg_packets_received_this_frame += 1;
pipe->num_msg_packets_received_this_frame += 1;
// Process packet
if (should_process)
{
NET_W32_Packet *packet = NET_W32.first_free_packet;
if (packet)
{
SllStackPop(NET_W32.first_free_packet);
String old_packet_data = packet->data;
ZeroStruct(packet);
packet->data = old_packet_data;
packet->data.len = 0;
}
else
{
packet = PushStruct(perm, NET_W32_Packet);
packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize);
}
packet->flags = header.flags;
packet->seq = header.seq;
packet->msg_seq = header.msg_seq;
CopyBytes(packet->data.text, src_data.text, src_data.len);
packet->data.len = src_data.len;
// Insert packet
{
NET_W32_Packet *left = peer->last_frag_packet;
for (; left; left = left->prev)
{
if (left->seq < packet->seq)
{
break;
}
}
DllQueueInsert(peer->first_frag_packet, peer->last_frag_packet, left, packet);
}
// Transfer fragmented -> contiguous packets
if (is_sequential)
{
NET_W32_Packet *contig_start = peer->first_frag_packet;
NET_W32_Packet *contig_end = packet;
for (NET_W32_Packet *tmp = packet->next; tmp; tmp = tmp->next)
{
if (tmp->seq == tmp->prev->seq + 1)
{
contig_end = tmp;
}
else
{
break;
}
}
if (peer->last_cont_packet)
{
peer->last_cont_packet->next = contig_start;
}
else
{
peer->first_cont_packet = contig_start;
}
contig_start->prev = peer->last_cont_packet;
peer->last_cont_packet = contig_end;
if (contig_end->next)
{
contig_end->next->prev = 0;
}
else
{
peer->last_frag_packet = 0;
}
peer->first_frag_packet = contig_end->next;
contig_end->next = 0;
i64 diff = contig_end->seq - peer->bottom_ack;
if (diff <= 63)
{
peer->ack_bits >>= diff;
}
else
{
peer->ack_bits = 0;
}
peer->bottom_ack = contig_end->seq;
}
}
// LogDebugF(
// "Received msg packet. seq: %F, msg seq: %F, new bottom ack: %F, new ack bits: %F, should_process: %F, data: \"%F\"",
// FmtSint(header.seq),
// FmtSint(header.msg_seq),
// FmtSint(peer->bottom_ack),
// FmtUint(peer->ack_bits),
// FmtUint(should_process),
// FmtString(src_data)
// );
}
}
}
}
}
//////////////////////////////
//- Assemble read messages
// TODO: Maximum message size
if (pipe->num_msg_packets_received_this_frame > 0)
{
LockTicketMutex(&pipe->back_msg_buff_seq_tm);
{
NET_W32_MsgBuff *msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)];
String msg_data = Zi;
msg_data.text = ArenaNext(msg_buff->arena, u8);
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
{
NET_W32_Packet *first_msg_packet = peer->first_cont_packet;
for (NET_W32_Packet *packet = peer->first_cont_packet; packet;)
{
NET_W32_Packet *next = packet->next;
{
b32 msg_has_end = 0;
if (packet->msg_seq != peer->last_cont_packet->msg_seq || !!(peer->last_cont_packet->flags | NET_W32_PacketFlag_EndMsg))
{
msg_has_end = 1;
}
if (msg_has_end)
{
msg_data.len += packet->data.len;
PushString(msg_buff->arena, packet->data);
b32 is_end_of_msg = !!(packet->flags & NET_W32_PacketFlag_EndMsg) || (next && next->msg_seq != packet->msg_seq);
if (is_end_of_msg)
{
NET_Msg *msg = PushStruct(msg_buff->arena, NET_Msg);
msg->sender = peer->key;
msg->data = msg_data;
DllQueuePush(msg_buff->msgs.first, msg_buff->msgs.last, msg);
++msg_buff->msgs.count;
{
peer->first_cont_packet = next;
if (next)
{
next->prev = 0;
}
else
{
peer->last_cont_packet = 0;
}
packet->next = NET_W32.first_free_packet;
NET_W32.first_free_packet = first_msg_packet;
}
first_msg_packet = next;
msg_data.text = ArenaNext(msg_buff->arena, u8);
msg_data.len = 0;
// LogSuccessF(
// "Assembled msg with msg seq: %F, data (%F bytes)",
// FmtSint(packet->msg_seq),
// FmtUint(msg->data.len)
// );
}
}
else
{
break;
}
}
packet = next;
}
}
}
UnlockTicketMutex(&pipe->back_msg_buff_seq_tm);
}
//////////////////////////////
//- Queue message packets
{
// Swap cmd buff
NET_W32_CmdBuff *cmd_buff = 0;
{
LockTicketMutex(&pipe->back_cmd_buff_seq_tm);
{
cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)];
pipe->back_cmd_buff_seq += 1;
NET_W32_CmdBuff *back_cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)];
ResetArena(back_cmd_buff->arena);
ZeroStruct(&back_cmd_buff->cmds);
}
UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm);
}
for (NET_W32_Cmd *cmd = cmd_buff->cmds.first; cmd; cmd = cmd->next)
{
NET_Key key = cmd->key;
NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key);
String src_data = cmd->data;
i64 src_pos = 0;
i64 msg_seq = ++peer->msg_seq;
// LogDebugF("Queued msg with seq %F", FmtSint(msg_seq));
// TODO: Burst packets
b32 is_msg_end = 0;
while (!is_msg_end)
{
i64 copy_len = MinI64(NET_PacketSize - (i64)peer->msg_fragment.len, src_data.len - src_pos);
CopyBytes(peer->msg_fragment.text + peer->msg_fragment.len, src_data.text + src_pos, copy_len);
src_pos += copy_len;
peer->msg_fragment.len += copy_len;
// Push packet
is_msg_end = src_pos >= (i64)src_data.len;
if (peer->msg_fragment.len == NET_PacketSize || is_msg_end)
{
NET_W32_Packet *packet = NET_W32.first_free_packet;
if (packet)
{
SllStackPop(NET_W32.first_free_packet);
String old_packet_data = packet->data;
ZeroStruct(packet);
packet->data = old_packet_data;
packet->data.len = 0;
}
else
{
packet = PushStruct(perm, NET_W32_Packet);
packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize);
}
packet->seq = ++peer->seq;
packet->msg_seq = msg_seq;
CopyBytes(packet->data.text, peer->msg_fragment.text, peer->msg_fragment.len);
packet->data.len = peer->msg_fragment.len;
if (is_msg_end)
{
packet->flags |= NET_W32_PacketFlag_EndMsg;
}
DllQueuePush(peer->first_remote_packet, peer->last_remote_packet, packet);
peer->msg_fragment.len = 0;
}
}
}
}
//////////////////////////////
//- Send heartbeat packets
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
{
b32 should_send_heartbeat = 0;
// Send heartbeat packet if we received any packets this frame
if (peer->num_msg_packets_received_this_frame > 0)
{
should_send_heartbeat = 1;
}
// Send heartbeat if we haven't sent any packets in a while
i64 now_ns = TimeNs();
if (now_ns - peer->last_packet_sent_ns > heartbeat_threshold_ns)
{
should_send_heartbeat = 1;
}
if (should_send_heartbeat)
{
struct sockaddr_in6 addr = NET_W32_AddressFromKey(peer->key);
i64 buff_len = 0;
u8 buff[Kibi(2)];
NET_W32_PacketHeader header = Zi;
{
header.checksum = 0;
header.flags = NET_W32_PacketFlag_Heartbeat;
header.bottom_ack = peer->bottom_ack;
header.ack_bits = peer->ack_bits;
}
CopyBytes(buff, &header, sizeof(header));
buff_len += sizeof(header);
u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff));
CopyBytes(buff, &checksum, sizeof(checksum));
sendto(
pipe->udp,
(char *)buff,
buff_len,
0,
(struct sockaddr *)&addr,
sizeof(addr)
);
Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len);
peer->last_packet_sent_ns = now_ns;
}
}
//////////////////////////////
//- Send message packets
// TODO: Rate limit
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
{
// bottom_ack represents the highest contiguous sequence acknowledgement, meaning sequences in range [0, bottom_ack] are always acked.
// This means bottom_ack + 1 is never acked.
// Ack bits represent acks for sequences in range [bottom_ack + 2, bottom_ack + 65]
i64 bottom_ack = peer->remote_bottom_ack;
u64 ack_bits = peer->remote_ack_bits;
NET_Key key = peer->key;
struct sockaddr_in6 addr = NET_W32_AddressFromKey(key);
// TODO: Maybe send more than just bottom_ack + 65 packets at a time.
for (NET_W32_Packet *packet = peer->first_remote_packet; packet && packet->seq <= bottom_ack + 65;)
{
NET_W32_Packet *next = packet->next;
{
i64 seq = packet->seq;
b32 is_acked = 0;
{
if (seq <= bottom_ack)
{
is_acked = 1;
}
else if (seq > bottom_ack + 1 && packet->seq <= bottom_ack + 65)
{
u64 ack_bit = (u64)1 << (seq - 2 - bottom_ack);
is_acked = !!(ack_bits & ack_bit);
}
}
if (is_acked)
{
// Prune acked packet
DllQueueRemove(peer->first_remote_packet, peer->last_remote_packet, packet);
SllStackPush(NET_W32.first_free_packet, packet);
}
else
{
// Transmit unacked packet
i64 now_ns = TimeNs();
if (packet->last_sent_ns == 0 || (now_ns - packet->last_sent_ns > msg_resend_threshold_ns))
{
peer->last_packet_sent_ns = now_ns;
packet->last_sent_ns = now_ns;
i64 buff_len = 0;
u8 buff[Kibi(2)];
NET_W32_PacketHeader header = Zi;
{
header.checksum = 0;
header.flags = packet->flags;
header.seq = packet->seq;
header.msg_seq = packet->msg_seq;
header.bottom_ack = peer->bottom_ack;
header.ack_bits = peer->ack_bits;
}
CopyBytes(buff, &header, sizeof(header));
buff_len += sizeof(header);
CopyBytes(buff + buff_len, packet->data.text, packet->data.len);
buff_len += packet->data.len;
u32 checksum = NET_W32_ChecksumFromPacketString(STRING(buff_len, buff));
CopyBytes(buff, &checksum, sizeof(checksum));
sendto(
pipe->udp,
(char *)buff,
buff_len,
0,
(struct sockaddr *)&addr,
sizeof(addr)
);
Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len);
// LogDebugF(
// "Sent msg packet. seq: %F, msg seq: %F, data: \"%F\"",
// FmtSint(header.seq),
// FmtSint(header.msg_seq),
// FmtString(packet->data)
// );
}
}
}
packet = next;
}
}
}
EndScratch(scratch);
}
}