working networking

This commit is contained in:
jacob 2026-01-15 23:22:11 -06:00
parent 418465408f
commit 9912e0bfdd
7 changed files with 772 additions and 387 deletions

View File

@ -16,7 +16,20 @@ void SetMemoryReadWrite(void *address, u64 size);
////////////////////////////////////////////////////////////
//~ Memory ops
Inline b32 MatchBytesZero(u8 *v, u64 size)
{
for (u64 idx = 0; idx < size; ++idx)
{
if (v[idx] != 0)
{
return 0;
}
}
return 1;
}
//- Wrappers
#define MatchStructZero(ptr) MatchBytesZero((u8 *)(ptr), sizeof(*ptr))
#define MatchBytes(p1, p2, n) (CmpBytes((p1), (p2), (n)) == 0)
#define MatchStruct(p1, p2) MatchBytes((p1), (p2), sizeof(*p1))
#define ZeroBytes(ptr, count) SetBytes((ptr), 0, (count))
@ -26,6 +39,7 @@ void SetMemoryReadWrite(void *address, u64 size);
#define CopyStructs(ptr_dst, ptr_src, n) CopyBytes((ptr_dst), (ptr_src), sizeof(*(ptr_dst)) * (n))
#define CopyStruct(ptr_dst, ptr_src) CopyStructs((ptr_dst), (ptr_src), 1)
//- Stubs
#define CopyBytes(dst, src, count) memcpy(dst, src, count)
#define SetBytes(dst, c, count) memset(dst, c, count)
#define CmpBytes(p1, p2, count) memcmp(p1, p2, count)

View File

@ -1,3 +1,6 @@
// NOTE: Burst messages with lengths exceeding packet size will degrade
// into sequenced messages
#define NET_PacketSize 1024
////////////////////////////////////////////////////////////
@ -12,11 +15,11 @@ Struct(NET_PipeHandle) { u64 v; };
Struct(NET_Key)
{
u64 v;
u8 addr[32];
};
#define NET_NilKey ((NET_Key) { 0 })
#define NET_IsKeyNil(k) ((k).v == 0)
#define NET_IsKeyNil(k) (MatchStructZero(&k))
////////////////////////////////////////////////////////////
//~ Message types
@ -26,7 +29,7 @@ Struct(NET_Msg)
NET_Msg *next;
NET_Msg *prev;
NET_Key src;
NET_Key sender;
String data;
};
@ -48,7 +51,7 @@ void NET_Bootstrap(void);
NET_PipeHandle NET_AcquirePipe(void);
void NET_Bind(NET_PipeHandle pipe, u64 port);
u64 NET_BoundPortFromPipe(NET_PipeHandle pipe_handle);
NET_Key NET_KeyFromString(String host, String port);
void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 unreliable);
NET_MsgList NET_Pop(Arena *arena, NET_PipeHandle pipe);
NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe);
void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst);

View File

@ -23,6 +23,72 @@ NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle)
return (NET_W32_Pipe *)pipe_handle.v;
}
NET_Key NET_W32_KeyFromAddress(struct sockaddr_in6 addr)
{
NET_Key result = Zi;
CopyBytes(&result, &addr, sizeof(addr));
return result;
}
struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key)
{
struct sockaddr_in6 result = Zi;
CopyBytes(&result, &key, sizeof(result));
return result;
}
u64 NET_W32_HashFromKey(NET_Key key)
{
u64 result = HashString(StringFromStruct(&key));
return result;
}
void NET_W32_SignalWorker(void)
{
// TODO
}
NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key)
{
// TODO: Address challenge on first receive
NET_W32_Peer *peer = 0;
u64 hash = NET_W32_HashFromKey(key);
NET_W32_PeerBin *bin = &pipe->peer_bins[hash % pipe->peer_bins_count];
peer = bin->first;
for (; peer; peer = peer->next_in_bin)
{
if (peer->hash == hash)
{
break;
}
}
if (!peer)
{
peer = NET_W32.first_free_peer;
if (peer)
{
SllStackPop(NET_W32.first_free_peer);
{
String old_msg_fragment = peer->fragment;
ZeroStruct(peer);
peer->fragment = old_msg_fragment;
peer->fragment.len = 0;
}
}
else
{
Arena *perm = PermArena();
peer = PushStruct(perm, NET_W32_Peer);
peer->fragment.text = PushStructsNoZero(perm, u8, NET_PacketSize);
}
peer->hash = hash;
peer->key = key;
DllQueuePush(pipe->first_peer, pipe->last_peer, peer);
DllQueuePushNP(bin->first, bin->last, peer, next_in_bin, prev_in_bin);
}
return peer;
}
////////////////////////////////////////////////////////////
//~ @hookimpl Net ops
@ -30,20 +96,182 @@ NET_PipeHandle NET_AcquirePipe(void)
{
Arena *perm = PermArena();
NET_W32_Pipe *pipe = PushStruct(perm, NET_W32_Pipe);
{
for (i64 idx = 0; idx < countof(pipe->cmd_buffs); ++idx)
{
NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[idx];
cmd_buff->arena = AcquireArena(Gibi(64));
}
for (i64 idx = 0; idx < countof(pipe->msg_buffs); ++idx)
{
NET_W32_MsgBuff *msg_buff = &pipe->msg_buffs[idx];
msg_buff->arena = AcquireArena(Gibi(64));
}
pipe->peer_bins_count = Kibi(1);
pipe->peer_bins = PushStructs(perm, NET_W32_PeerBin, pipe->peer_bins_count);
}
{
LockTicketMutex(&NET_W32.pipes_tm);
{
++NET_W32.pipes_count;
DllQueuePush(NET_W32.first_pipe, NET_W32.last_pipe, pipe);
}
UnlockTicketMutex(&NET_W32.pipes_tm);
}
return (NET_PipeHandle) { .v = (u64) pipe };
}
void NET_Bind(NET_PipeHandle pipe_handle, u64 port)
{
TempArena scratch = BeginScratchNoConflict();
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
if (port == 0)
{
Atomic64Set(&pipe->desired_port, 0xFFFFFFFFFFFFFFFFull);
}
else
{
Atomic64Set(&pipe->desired_port, port);
}
}
NET_Key NET_KeyFromString(String host, String port)
{
NET_Key result = Zi;
TempArena scratch = BeginScratchNoConflict();
{
struct addrinfo hints = Zi;
hints.ai_family = AF_INET6;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
char *host_cstr = CstrFromString(scratch.arena, host);
char *port_cstr = CstrFromString(scratch.arena, port);
struct sockaddr_in6 addr = Zi;
{
struct addrinfo *first_ai = 0;
if (getaddrinfo(host_cstr, port_cstr, &hints, &first_ai) == 0)
{
for (struct addrinfo *ai = first_ai; ai; ai = ai->ai_next)
{
if (ai->ai_family == AF_INET6 && ai->ai_addrlen >= sizeof(addr))
{
CopyBytes(&addr, ai->ai_addr, sizeof(addr));
break;
}
}
}
if (first_ai)
{
freeaddrinfo(first_ai);
}
}
result = NET_W32_KeyFromAddress(addr);
}
EndScratch(scratch);
return result;
}
NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle)
{
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
NET_W32_MsgBuff *msg_buff = 0;
{
LockTicketMutex(&pipe->back_msg_buff_seq_tm);
{
msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)];
pipe->back_msg_buff_seq += 1;
NET_W32_MsgBuff *back_msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)];
ResetArena(back_msg_buff->arena);
ZeroStruct(&back_msg_buff->msgs);
}
UnlockTicketMutex(&pipe->back_msg_buff_seq_tm);
}
return msg_buff->msgs;
}
void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst)
{
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
LockTicketMutex(&pipe->back_cmd_buff_seq_tm);
{
NET_W32_CmdBuff *cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)];
NET_W32_Cmd *cmd = PushStruct(cmd_buff->arena, NET_W32_Cmd);
cmd->key = dst;
cmd->data = PushString(cmd_buff->arena, data);
cmd->burst = burst;
SllQueuePush(cmd_buff->cmds.first, cmd_buff->cmds.last, cmd);
++cmd_buff->cmds.count;
}
UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm);
NET_W32_SignalWorker();
}
////////////////////////////////////////////////////////////
//~ Worker
void NET_W32_TickForever(WaveLaneCtx *lane)
{
Arena *perm = PermArena();
for (;;)
{
TempArena scratch = BeginScratchNoConflict();
u32 magic = 0xde8c590b;
i64 heartbeat_threshold_ns = NsFromSeconds(0.250);
// TODO: Block until send/recv/signal
//////////////////////////////
//- Pop pipes
i64 pipes_count = 0;
NET_W32_Pipe **pipes = 0;
{
LockTicketMutex(&NET_W32.pipes_tm);
{
pipes = PushStructsNoZero(scratch.arena, NET_W32_Pipe *, NET_W32.pipes_count);
for (NET_W32_Pipe *pipe = NET_W32.first_pipe; pipe; pipe = pipe->next)
{
pipes[pipes_count] = pipe;
pipes_count += 1;
}
}
UnlockTicketMutex(&NET_W32.pipes_tm);
}
for (i64 pipe_idx = 0; pipe_idx < pipes_count; ++pipe_idx)
{
NET_W32_Pipe *pipe = pipes[pipe_idx];
//////////////////////////////
//- Reset peer data
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
{
peer->num_msg_packets_received_this_frame = 0;
peer->num_msg_packets_sent_this_frame = 0;
}
//////////////////////////////
//- Bind
{
u64 desired_port = Atomic64Fetch(&pipe->desired_port);
if (desired_port != 0)
{
b32 is_ephemeral = desired_port >= Kibi(64);
if (is_ephemeral)
{
desired_port = 0;
}
b32 is_ephemeral = port == 0;
// FIXME: Retry on timeout
if (!pipe->udp || (!is_ephemeral && pipe->bound_port != port))
if (!pipe->udp || (!is_ephemeral && pipe->bound_port != desired_port))
{
b32 ok = 1;
String port_str = StringF(scratch.arena, "%F", FmtUint(port));
String port_str = StringF(scratch.arena, "%F", FmtUint(desired_port));
char *port_cstr = CstrFromString(scratch.arena, port_str);
if (pipe->udp)
@ -176,108 +404,9 @@ void NET_Bind(NET_PipeHandle pipe_handle, u64 port)
freeaddrinfo(ai);
}
}
EndScratch(scratch);
}
u64 NET_BoundPortFromPipe(NET_PipeHandle pipe_handle)
{
// TODO: Instead maybe return "BindingStatus", which includes port + binding error/progress
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
return pipe->bound_port;
}
NET_Key NET_KeyFromString(String str)
{
NET_Key result = Zi;
return result;
}
void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 unreliable)
{
// NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
// if (!pipe->udp)
// {
// // Bind to ephemeral port if not bound
// NET_Bind(pipe_handle, 0);
// }
// if (pipe->udp)
// {
// }
}
NET_MsgList NET_Pop(Arena *arena, NET_PipeHandle pipe_handle)
{
NET_MsgList result = Zi;
return result;
}
////////////////////////////////////////////////////////////
//~ Worker
void NET_W32_TickForever(WaveLaneCtx *lane)
{
Arena *perm = PermArena();
NET_W32_Pipe *first_pipe = 0;
NET_W32_Pipe *last_pipe = 0;
NET_W32_Peer *first_free_peer = 0;
Enum(PacketFlag)
{
PacketFlag_None,
};
Struct(PacketHeader)
{
u32 magic;
PacketFlag flags;
i64 seq;
i64 msg_seq;
i64 bottom_ack;
u64 ack_bits;
};
for (;;)
{
TempArena scratch = BeginScratchNoConflict();
u32 magic = 0xde8c590b;
// TODO: Block until send/recv/signal
for (NET_W32_Pipe *pipe = first_pipe; pipe; pipe = pipe->next)
{
//////////////////////////////
//- Pop cmds
NET_W32_CmdList pipe_cmds = Zi;
//////////////////////////////
//- Assemble writes
// for (NET_W32_Cmd *cmd = pipe_cmds.first; cmd; cmd = cmd->next)
// {
// if (pipe->udp)
// {
// NET_Key key = cmd->dst_key;
// NET_W32_Routing routing = NET_W32_RoutingFromKey(key);
// struct sockaddr_in6 *addr = &routing->addr;
// sendto(
// pipe->udp,
// packet.text,
// packet.len,
// 0,
// (struct sockaddr *)addr,
// sizeof(*addr)
// );
// }
// }
//////////////////////////////
//- Read socket
@ -288,55 +417,39 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
while (len >= 0)
{
u8 buff[Kibi(2)];
struct sockaddr_in6 addr = peer->addr;
struct sockaddr_in6 addr = Zi;
i32 addr_sz = sizeof(addr);
len = recvfrom(pipe->udp, buff, countof(buff), 0, (struct sockaddr *)&sockaddr_in6, &addr_sz);
len = recvfrom(pipe->udp, (char *)buff, countof(buff), 0, (struct sockaddr *)&addr, &addr_sz);
if (
len >= sizeof(PacketHeader) &&
len <= sizeof(PacketHeader) + NET_PacketSize &&
len >= sizeof(NET_W32_PacketHeader) &&
len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize &&
MatchBytes(buff, &magic, sizeof(magic))
)
{
NET_Key key = NET_W32_KeyFromAddress(addr);
PacketHeader header = Zi;
NET_W32_PacketHeader header = Zi;
CopyBytes(&header, buff, sizeof(header));
String src_data = Zi;
src_data.text = buff + sizeof(header);
src_data.len = len - sizeof(header);
// if (!(header.flags & NET_W32_PacketFlag_Heartbeat))
// {
// LogDebugF(
// "Received msg. packet seq: %F, msg seq: %F, data: \"%F\"",
// FmtSint(header.seq),
// FmtSint(header.msg_seq),
// FmtString(src_data)
// );
// }
//- Fetch peer
// TODO: Address challenge on first receive
NET_W32_Peer *peer = 0;
{
u64 hash = NET_HashFromKey(key);
NET_W32_PeerBin *bin = &pipe->peer_bins[hash % pipe->peer_bins_count];
NET_W32_Peer *peer = bin->first;
for (; peer; peer = peer->next_in_bin)
{
if (NET_MatchKey(peer->key, key))
{
break;
}
}
if (!peer)
{
peer = first_free_peer;
if (peer)
{
SllStackPop(first_free_peer);
ZeroStruct(peer);
}
else
{
peer = PushStruct(perm, NET_W32_Peer);
}
NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key);
peer->key = key;
DllQueuePush(pipe->first_peer, pipe->last_peer, pipe);
DllQueuePushNP(bin->first, bin->last, pipe, next_in_bin, prev_in_bin);
}
}
//- Read packet
{
// Update remote acks
{
// Update remote stats
if (header.bottom_ack == peer->remote_bottom_ack)
{
peer->remote_ack_bits |= header.ack_bits;
@ -346,9 +459,8 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
peer->remote_bottom_ack = header.bottom_ack;
peer->remote_ack_bits = header.ack_bits;
}
}
// Update our acks
// Update our stats
b32 is_sequential = 0;
b32 should_process = 0;
if (header.seq == peer->bottom_ack + 1)
@ -360,17 +472,23 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
}
else if (header.seq > peer->bottom_ack + 1 && header.seq < peer->bottom_ack + 65)
{
u64 ack_bit = 1 << (header.seq - 2 - peer->bottom_ack);
u64 ack_bit = (u64)1 << (header.seq - 2 - peer->bottom_ack);
should_process = !!(peer->ack_bits & ack_bit);
peer->ack_bits |= ack_bit;
}
if (!(header.flags & NET_W32_PacketFlag_Heartbeat))
{
peer->num_msg_packets_received_this_frame += 1;
}
peer->last_packet_received_ns = TimeNs();
// Process packet
if (should_process)
{
NET_W32_Packet *packet = first_free_packet;
NET_W32_Packet *packet = NET_W32.first_free_packet;
if (packet)
{
SllStackPop(NET_W32.first_free_packet);
String old_packet_data = packet->data;
ZeroStruct(packet);
packet->data = old_packet_data;
@ -384,42 +502,40 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
packet->flags = header.flags;
packet->seq = header.seq;
packet->msg_seq = header.msg_seq;
CopyBytes(packet->data.text, src_data.text, src_data.len);
packet->data.len = src_data.len;
// Insert packet
// TODO: Ring buffer
// if (is_sequential)
// {
// DllQueuePush(peer->first_msg_packet, peer->last_msg_packet, packet);
// for (NET_W32_Packet *tmp = peer->first_packet; tmp;)
// {
// NET_W32_Packet *next = tmp->next;
// if (tmp->seq == peer->last_msg_packet->seq + 1)
// {
// DllQueueRemove(peer->first_packet,
// }
// else
// {
// break;
// }
// tmp = next;
// }
// }
// else
// {
{
NET_W32_Packet *left = peer->last_packet;
NET_W32_Packet *left = peer->last_fragmented_packet;
for (; left; left = left->prev)
{
if (left < packet->seq)
if (left->seq < packet->seq)
{
break;
}
}
DllQueueInsert(peer->first_unsequential_packet, peer->last_unsequential_packet, left, packet);
DllQueueInsert(peer->first_fragmented_packet, peer->last_fragmented_packet, left, packet);
}
// Transfer fragmented -> contiguous packets
if (is_sequential)
{
if (peer->last_contiguous_packet)
{
peer->last_contiguous_packet->next = peer->first_fragmented_packet;
}
else
{
peer->first_contiguous_packet = peer->first_fragmented_packet;
}
peer->first_fragmented_packet->prev = peer->last_contiguous_packet;
peer->last_contiguous_packet = packet;
peer->first_fragmented_packet = packet->next;
if (peer->last_fragmented_packet == packet)
{
peer->last_fragmented_packet = 0;
}
}
}
}
@ -428,24 +544,144 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
}
//////////////////////////////
//- Assemble messages
//- Assemble read messages
// TODO: Maximum message size
{
LockTicketMutex(&pipe->back_msg_buff_seq_tm);
{
NET_W32_MsgBuff *msg_buff = &pipe->msg_buffs[pipe->back_msg_buff_seq % countof(pipe->msg_buffs)];
String msg_data = Zi;
msg_data.text = ArenaNext(msg_buff->arena, u8);
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
{
for (NET_W32_Packet *packet = pipe->first_packet; packet;)
NET_W32_Packet *first_msg_packet = peer->first_contiguous_packet;
for (NET_W32_Packet *packet = peer->first_contiguous_packet; packet;)
{
NET_W32_Packet *next = packet->next;
{
if (packet->
b32 msg_has_end = 0;
if (packet->msg_seq != peer->last_contiguous_packet->msg_seq || !!(peer->last_contiguous_packet->flags | NET_W32_PacketFlag_EndMsg))
{
msg_has_end = 1;
}
if (msg_has_end)
{
msg_data.len += packet->data.len;
PushString(msg_buff->arena, packet->data);
b32 is_end_of_msg = !!(packet->flags & NET_W32_PacketFlag_EndMsg) || (next && next->msg_seq != packet->msg_seq);
if (is_end_of_msg)
{
NET_Msg *msg = PushStruct(msg_buff->arena, NET_Msg);
msg->sender = peer->key;
msg->data = msg_data;
DllQueuePush(msg_buff->msgs.first, msg_buff->msgs.last, msg);
++msg_buff->msgs.count;
{
peer->first_contiguous_packet = next;
if (next)
{
next->prev = 0;
}
else
{
peer->last_contiguous_packet = 0;
}
packet->next = NET_W32.first_free_packet;
NET_W32.first_free_packet = first_msg_packet;
}
first_msg_packet = next;
msg_data.text = ArenaNext(msg_buff->arena, u8);
msg_data.len = 0;
}
}
else
{
break;
}
}
packet = next;
}
}
}
UnlockTicketMutex(&pipe->back_msg_buff_seq_tm);
}
//////////////////////////////
//- Write peers
//- Queue message packets
{
// Swap cmd buff
NET_W32_CmdBuff *cmd_buff = 0;
{
LockTicketMutex(&pipe->back_cmd_buff_seq_tm);
{
cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)];
pipe->back_cmd_buff_seq += 1;
NET_W32_CmdBuff *back_cmd_buff = &pipe->cmd_buffs[pipe->back_cmd_buff_seq % countof(pipe->cmd_buffs)];
ResetArena(back_cmd_buff->arena);
ZeroStruct(&back_cmd_buff->cmds);
}
UnlockTicketMutex(&pipe->back_cmd_buff_seq_tm);
}
for (NET_W32_Cmd *cmd = cmd_buff->cmds.first; cmd; cmd = cmd->next)
{
NET_Key key = cmd->key;
NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key);
String src_data = cmd->data;
i64 src_pos = 0;
i64 msg_seq = ++peer->msg_seq;
// TODO: Burst packets
while (src_pos < (i64)src_data.len)
{
i64 copy_len = MinI64(NET_PacketSize - (i64)peer->fragment.len, src_data.len - src_pos);
CopyBytes(peer->fragment.text + peer->fragment.len, src_data.text + src_pos, copy_len);
src_pos += copy_len;
peer->fragment.len += copy_len;
// Push packet
b32 is_msg_end = src_pos >= (i64)src_data.len;
if (peer->fragment.len == NET_PacketSize || is_msg_end)
{
NET_W32_Packet *packet = NET_W32.first_free_packet;
if (packet)
{
SllStackPop(NET_W32.first_free_packet);
String old_packet_data = packet->data;
ZeroStruct(packet);
packet->data = old_packet_data;
packet->data.len = 0;
}
else
{
packet = PushStruct(perm, NET_W32_Packet);
packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize);
}
packet->seq = ++peer->seq;
packet->msg_seq = msg_seq;
CopyBytes(packet->data.text, peer->fragment.text, peer->fragment.len);
packet->data.len = peer->fragment.len;
if (is_msg_end)
{
packet->flags |= NET_W32_PacketFlag_EndMsg;
}
DllQueuePush(peer->first_remote_packet, peer->last_remote_packet, packet);
peer->fragment.len = 0;
}
}
}
}
//////////////////////////////
//- Send message packets
// TODO: Rate limit
@ -457,7 +693,8 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
i64 bottom_ack = peer->remote_bottom_ack;
u64 ack_bits = peer->remote_ack_bits;
struct sockaddr_in6 addr = peer->addr;
NET_Key key = peer->key;
struct sockaddr_in6 addr = NET_W32_AddressFromKey(key);
// TODO: Maybe send more than just bottom_ack + 65 packets at a time.
for (NET_W32_Packet *packet = peer->first_remote_packet; packet && packet->seq < bottom_ack + 65;)
@ -473,18 +710,21 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
}
else if (seq > bottom_ack + 1 && packet->seq < bottom_ack + 65)
{
u64 ack_bit = 1 << (seq - 2 - bottom_ack);
u64 ack_bit = (u64)1 << (seq - 2 - bottom_ack);
is_acked = !!(ack_bits & ack_bit);
}
}
if (is_acked)
{
// Prune acked packet
DllQueueRemove(peer->first_remote_packet, peer->last_packet, packet);
SllStackPush(first_free_packet, packet);
DllQueueRemove(peer->first_remote_packet, peer->last_remote_packet, packet);
SllStackPush(NET_W32.first_free_packet, packet);
}
else
{
peer->num_msg_packets_sent_this_frame += 1;
peer->last_packet_sent_ns = TimeNs();
// Transmit unacked packet
// FIXME: Rate limit, don't send if we've already sent in the last second.
// NOTE: If we do this we should probably put the net worker on something like a 1-second auto-run timer
@ -492,22 +732,24 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
i64 buff_len = 0;
u8 buff[Kibi(2)];
// {
// i64 seq = packet->seq;
// i64 top_ack = 0;
// i64 ack_bits = 0;
// CopyBytes(buff + buff_len, &magic, 4);
// buff_len += 4;
// CopyBytes(buff + buff_len, &seq, 8);
// buff_len += 8;
// CopyBytes(buff + buff_len, &top_bits, 8);
// buff_len += 8;
// CopyBytes(buff + buff_len, &ack_bits, 8);
// }
NET_W32_PacketHeader header = Zi;
{
header.magic = magic;
header.flags = packet->flags;
header.seq = packet->seq;
header.msg_seq = packet->msg_seq;
header.bottom_ack = peer->bottom_ack;
header.ack_bits = peer->ack_bits;
}
CopyBytes(buff, &header, sizeof(header));
buff_len += sizeof(header);
CopyBytes(buff + buff_len, packet->data.text, packet->data.len);
buff_len += packet->data.len;
sendto(
pipe->udp,
buff,
(char *)buff,
buff_len,
0,
(struct sockaddr *)&addr,
@ -518,91 +760,56 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
packet = next;
}
}
//////////////////////////////
//- Send heartbeats
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
{
b32 should_send_heartbeat = 0;
// If we received a message from a peer this frame but did not send any
// back, we should send a heartbeat packet so that the peer can still
// receive up to date ack info
if (peer->num_msg_packets_received_this_frame > 0 && peer->num_msg_packets_sent_this_frame == 0)
{
should_send_heartbeat = 1;
}
// Send heartbeat if we haven't sent any packets in a while
i64 now_ns = TimeNs();
if (now_ns - peer->last_packet_sent_ns > heartbeat_threshold_ns)
{
should_send_heartbeat = 1;
}
if (should_send_heartbeat)
{
struct sockaddr_in6 addr = NET_W32_AddressFromKey(peer->key);
//////////////////////////////
//- Pop cmds
// NET_W32_Cmd *first_cmd = 0;
// NET_W32_Cmd *last_cmd = 0;
//////////////////////////////
//- Create pipes from cmds
// for (NET_W32_Cmd *cmd = first_cmd; cmd; cmd = cmd->next)
// {
// NET_Key key = cmd->dst_key;
// NET_W32_Routing routing = NET_W32_RoutingFromKey(key);
// u64 port = routing.recv_port;
// struct sockaddr *addr = NET_W32_SockAddressFromKey(key);
// }
// NET_W32_Cmd *first_cmd = 0;
// NET_W32_Cmd *last_cmd = 0;
// {
// Lock lock = LockE(&NET_W32.pipes_mutex);
// {
// }
// Unlock(&lock);
// }
//////////////////////////////
//- Receive messages
//////////////////////////////
//- Send messages
// {
// for (NET_W32_Pipe *pipe = first_pipe; pipe; pipe = pipe->next)
// {
// if (pipe->udp)
// {
// sendto(
// pipe->udp,
// packet.text,
// packet.len,
// );
// }
// }
// }
i64 buff_len = 0;
u8 buff[Kibi(2)];
NET_W32_PacketHeader header = Zi;
{
header.magic = magic;
header.flags = NET_W32_PacketFlag_Heartbeat;
header.bottom_ack = peer->bottom_ack;
header.ack_bits = peer->ack_bits;
}
CopyBytes(buff, &header, sizeof(header));
buff_len += sizeof(header);
sendto(
pipe->udp,
(char *)buff,
buff_len,
0,
(struct sockaddr *)&addr,
sizeof(addr)
);
peer->last_packet_sent_ns = now_ns;
}
}
}
EndScratch(scratch);
}

View File

@ -1,8 +1,141 @@
////////////////////////////////////////////////////////////
//~ Command buff types
Struct(NET_W32_Cmd)
{
NET_W32_Cmd *next;
NET_Key key;
String data;
b32 burst;
};
Struct(NET_W32_CmdList)
{
i64 count;
NET_W32_Cmd *first;
NET_W32_Cmd *last;
};
Struct(NET_W32_CmdBuff)
{
Arena *arena;
NET_W32_CmdList cmds;
};
////////////////////////////////////////////////////////////
//~ Message buff types
Struct(NET_W32_MsgBuff)
{
Arena *arena;
NET_MsgList msgs;
};
////////////////////////////////////////////////////////////
//~ Packet types
Enum(NET_W32_PacketFlag)
{
NET_W32_PacketFlag_None = 0,
NET_W32_PacketFlag_EndMsg = (1 << 0),
NET_W32_PacketFlag_Heartbeat = (1 << 2),
// NET_W32_PacketFlag_Burst = (1 << 3),
};
Struct(NET_W32_PacketHeader)
{
u32 magic;
NET_W32_PacketFlag flags;
i64 seq;
i64 msg_seq;
i64 bottom_ack;
u64 ack_bits;
};
Struct(NET_W32_Packet)
{
NET_W32_Packet *next;
NET_W32_Packet *prev;
NET_W32_PacketFlag flags;
i64 seq;
i64 msg_seq;
String data;
};
////////////////////////////////////////////////////////////
//~ Peer types
Struct(NET_W32_Peer)
{
NET_W32_Peer *next;
NET_W32_Peer *prev;
NET_W32_Peer *next_in_bin;
NET_W32_Peer *prev_in_bin;
u64 hash;
NET_Key key;
i64 remote_bottom_ack;
i64 remote_ack_bits;
i64 bottom_ack;
i64 ack_bits;
NET_W32_Packet *first_remote_packet;
NET_W32_Packet *last_remote_packet;
NET_W32_Packet *first_fragmented_packet;
NET_W32_Packet *last_fragmented_packet;
NET_W32_Packet *first_contiguous_packet;
NET_W32_Packet *last_contiguous_packet;
i64 seq;
i64 msg_seq;
String fragment;
i64 last_packet_received_ns;
i64 last_packet_sent_ns;
i64 num_msg_packets_received_this_frame;
i64 num_msg_packets_sent_this_frame;
};
Struct(NET_W32_PeerBin)
{
NET_W32_Peer *first;
NET_W32_Peer *last;
};
////////////////////////////////////////////////////////////
//~ Pipe types
Struct(NET_W32_Pipe)
{
//- Shared data
NET_W32_Pipe *next;
NET_W32_Pipe *prev;
Atomic64 desired_port; // >64k means ephemeral
TicketMutex back_cmd_buff_seq_tm;
i64 back_cmd_buff_seq;
NET_W32_CmdBuff cmd_buffs[2];
TicketMutex back_msg_buff_seq_tm;
i64 back_msg_buff_seq;
NET_W32_MsgBuff msg_buffs[2];
//- Worker data
NET_W32_Peer *first_peer;
NET_W32_Peer *last_peer;
i64 peer_bins_count;
NET_W32_PeerBin *peer_bins;
u64 bound_port;
SOCKET udp;
};
@ -12,7 +145,13 @@ Struct(NET_W32_Pipe)
Struct(NET_W32_Ctx)
{
i32 _;
TicketMutex pipes_tm;
i64 pipes_count;
NET_W32_Pipe *first_pipe;
NET_W32_Pipe *last_pipe;
NET_W32_Peer *first_free_peer;
NET_W32_Packet *first_free_packet;
};
extern NET_W32_Ctx NET_W32;
@ -21,6 +160,11 @@ extern NET_W32_Ctx NET_W32;
//~ Helpers
NET_W32_Pipe *NET_W32_PipeFromHandle(NET_PipeHandle pipe_handle);
NET_Key NET_W32_KeyFromAddress(struct sockaddr_in6 addr);
struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key);
u64 NET_W32_HashFromKey(NET_Key key);
void NET_W32_SignalWorker(void);
NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key);
////////////////////////////////////////////////////////////
//~ Worker

View File

@ -327,7 +327,7 @@ Struct(P_Msg)
//- In
P_Key dst_user; // If dst is 0, then cmd is meant to be broadcast
b32 unreliable;
b32 burst;
//- Out

View File

@ -130,7 +130,7 @@ void S_TickForever(WaveLaneCtx *lane)
P_MsgList in_msgs = Zi;
{
NET_MsgList net_msgs = NET_Pop(frame_arena, net_pipe);
NET_MsgList net_msgs = NET_Swap(frame_arena, net_pipe);
for (NET_Msg *net_msg = net_msgs.first; net_msg; net_msg = net_msg->next)
{
String packed = net_msg->data;
@ -430,7 +430,7 @@ void S_TickForever(WaveLaneCtx *lane)
// Arena *msgs_arena = P_tl.out_msgs_arena;
// P_Msg *msg = P_PushMsg(P_MsgKind_SimSnapshot, Zstr);
// msg->dst_user = user->key;
// msg->unreliable = 1;
// msg->burst = 1;
// P_SimSnapshot *snapshot = &msg->sim_snapshot;
// {
// snapshot->world_seed = world->seed;
@ -486,7 +486,7 @@ void S_TickForever(WaveLaneCtx *lane)
{
if (user->is_user)
{
NET_Push(net_pipe, user->net, packed, msg->unreliable);
NET_Push(net_pipe, user->net, packed, msg->burst);
}
}
}
@ -496,7 +496,7 @@ void S_TickForever(WaveLaneCtx *lane)
if (!NET_IsKeyNil(user->net))
{
String packed = P_PackMessage(frame_arena, msg);
NET_Push(net_pipe, user->net, packed, msg->unreliable);
NET_Push(net_pipe, user->net, packed, msg->burst);
}
}
}

View File

@ -346,7 +346,6 @@ void V_TickForever(WaveLaneCtx *lane)
const f32 meters_per_draw_width = 18;
NET_PipeHandle net_pipe = NET_AcquirePipe();
NET_Bind(net_pipe, 0);
//////////////////////////////
//- Init vis state
@ -2630,7 +2629,8 @@ void V_TickForever(WaveLaneCtx *lane)
P_MsgList in_msgs = Zi;
{
NET_MsgList net_msgs = NET_Pop(frame->arena, net_pipe);
NET_Bind(net_pipe, 0);
NET_MsgList net_msgs = NET_Swap(frame->arena, net_pipe);
for (NET_Msg *net_msg = net_msgs.first; net_msg; net_msg = net_msg->next)
{
String packed = net_msg->data;
@ -2644,6 +2644,12 @@ void V_TickForever(WaveLaneCtx *lane)
}
}
//////////////////////////////
//- Pop snapshot from sim
@ -3747,11 +3753,22 @@ void V_TickForever(WaveLaneCtx *lane)
//////////////////////////////
//- Send messages
// FIXME: Remove this (testing)
NET_Key server_key = NET_KeyFromString(Lit("127.0.0.1"), Lit("22121"));
NET_Push(net_pipe, server_key, Lit("HELLO!!!!"), 0);
for (P_MsgNode *msg_node = P_tl.out_msgs.first; msg_node; msg_node = msg_node->next)
{
P_Msg *msg = &msg_node->msg;
String packed = P_PackMessage(frame->arena, msg);
NET_Push(net_pipe, user->net, packed, msg->unreliable);
NET_Push(net_pipe, server_key, packed, msg->burst);
}
//////////////////////////////