adler32 checksum for packets
This commit is contained in:
parent
e6f1bb6661
commit
27c337202d
@ -40,6 +40,15 @@ Struct(NET_MsgList)
|
||||
NET_Msg *last;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ Stat types
|
||||
|
||||
Struct(NET_PipeStatistics)
|
||||
{
|
||||
i64 total_bytes_sent;
|
||||
i64 total_bytes_received;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ @hookdecl Bootstrap
|
||||
|
||||
@ -48,10 +57,13 @@ void NET_Bootstrap(void);
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ @hookdecl Net ops
|
||||
|
||||
NET_PipeHandle NET_AcquirePipe(void);
|
||||
|
||||
void NET_Bind(NET_PipeHandle pipe, u64 port);
|
||||
NET_Key NET_KeyFromString(String host, String port);
|
||||
|
||||
NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe);
|
||||
NET_PipeHandle NET_AcquirePipe(void);
|
||||
|
||||
void NET_Bind(NET_PipeHandle pipe_handle, u64 port);
|
||||
|
||||
NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle);
|
||||
void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst);
|
||||
|
||||
NET_PipeStatistics NET_StatsFromPipe(NET_PipeHandle pipe_handle);
|
||||
|
||||
@ -146,9 +146,62 @@ NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key)
|
||||
return peer;
|
||||
}
|
||||
|
||||
u32 NET_W32_ChecksumFromString(String str)
|
||||
{
|
||||
// Adler-32
|
||||
// https://en.wikipedia.org/wiki/Adler-32
|
||||
u32 a = 1;
|
||||
u32 b = 0;
|
||||
for (u64 idx = 0; idx < str.len; ++idx)
|
||||
{
|
||||
a = (a + str.text[idx]) % 65521;
|
||||
b = (b + a) % 65521;
|
||||
}
|
||||
return (b << 16) | a;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ @hookimpl Net ops
|
||||
|
||||
NET_Key NET_KeyFromString(String host, String port)
|
||||
{
|
||||
NET_Key result = Zi;
|
||||
TempArena scratch = BeginScratchNoConflict();
|
||||
{
|
||||
struct addrinfo hints = Zi;
|
||||
hints.ai_family = AF_INET6;
|
||||
hints.ai_socktype = SOCK_DGRAM;
|
||||
hints.ai_protocol = IPPROTO_UDP;
|
||||
hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
|
||||
|
||||
char *host_cstr = CstrFromString(scratch.arena, host);
|
||||
char *port_cstr = CstrFromString(scratch.arena, port);
|
||||
|
||||
struct sockaddr_in6 addr = Zi;
|
||||
{
|
||||
struct addrinfo *first_ai = 0;
|
||||
if (getaddrinfo(host_cstr, port_cstr, &hints, &first_ai) == 0)
|
||||
{
|
||||
for (struct addrinfo *ai = first_ai; ai; ai = ai->ai_next)
|
||||
{
|
||||
if (ai->ai_family == AF_INET6 && ai->ai_addrlen >= sizeof(addr))
|
||||
{
|
||||
CopyBytes(&addr, ai->ai_addr, sizeof(addr));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (first_ai)
|
||||
{
|
||||
freeaddrinfo(first_ai);
|
||||
}
|
||||
}
|
||||
result = NET_W32_KeyFromAddress(addr);
|
||||
}
|
||||
EndScratch(scratch);
|
||||
return result;
|
||||
}
|
||||
|
||||
NET_PipeHandle NET_AcquirePipe(void)
|
||||
{
|
||||
Arena *perm = PermArena();
|
||||
@ -193,45 +246,6 @@ void NET_Bind(NET_PipeHandle pipe_handle, u64 port)
|
||||
// FIXME: Signal here if ports don't match
|
||||
}
|
||||
|
||||
NET_Key NET_KeyFromString(String host, String port)
|
||||
{
|
||||
NET_Key result = Zi;
|
||||
TempArena scratch = BeginScratchNoConflict();
|
||||
{
|
||||
struct addrinfo hints = Zi;
|
||||
hints.ai_family = AF_INET6;
|
||||
hints.ai_socktype = SOCK_DGRAM;
|
||||
hints.ai_protocol = IPPROTO_UDP;
|
||||
hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
|
||||
|
||||
char *host_cstr = CstrFromString(scratch.arena, host);
|
||||
char *port_cstr = CstrFromString(scratch.arena, port);
|
||||
|
||||
struct sockaddr_in6 addr = Zi;
|
||||
{
|
||||
struct addrinfo *first_ai = 0;
|
||||
if (getaddrinfo(host_cstr, port_cstr, &hints, &first_ai) == 0)
|
||||
{
|
||||
for (struct addrinfo *ai = first_ai; ai; ai = ai->ai_next)
|
||||
{
|
||||
if (ai->ai_family == AF_INET6 && ai->ai_addrlen >= sizeof(addr))
|
||||
{
|
||||
CopyBytes(&addr, ai->ai_addr, sizeof(addr));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (first_ai)
|
||||
{
|
||||
freeaddrinfo(first_ai);
|
||||
}
|
||||
}
|
||||
result = NET_W32_KeyFromAddress(addr);
|
||||
}
|
||||
EndScratch(scratch);
|
||||
return result;
|
||||
}
|
||||
|
||||
NET_MsgList NET_Swap(Arena *arena, NET_PipeHandle pipe_handle)
|
||||
{
|
||||
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
|
||||
@ -267,6 +281,15 @@ void NET_Push(NET_PipeHandle pipe_handle, NET_Key dst, String data, b32 burst)
|
||||
NET_W32_SignalWorker();
|
||||
}
|
||||
|
||||
NET_PipeStatistics NET_StatsFromPipe(NET_PipeHandle pipe_handle)
|
||||
{
|
||||
NET_PipeStatistics result = Zi;
|
||||
NET_W32_Pipe *pipe = NET_W32_PipeFromHandle(pipe_handle);
|
||||
result.total_bytes_sent = Atomic64Fetch(&pipe->total_bytes_sent);
|
||||
result.total_bytes_received = Atomic64Fetch(&pipe->total_bytes_received);
|
||||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ Worker
|
||||
|
||||
@ -281,7 +304,9 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
u32 magic = 0xde8c590b;
|
||||
i64 heartbeat_threshold_ns = NsFromSeconds(0.250);
|
||||
i64 passive_run_threshold_ns = NsFromSeconds(0.250);
|
||||
i64 msg_resend_threshold_ns = NsFromSeconds(0.500);
|
||||
|
||||
// TODO: Base this on peer's latency w/ rolling backoff
|
||||
i64 msg_resend_threshold_ns = NsFromSeconds(0.250);
|
||||
|
||||
//////////////////////////////
|
||||
//- Wait
|
||||
@ -365,11 +390,14 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
NET_W32_Pipe *pipe = pipes[pipe_idx];
|
||||
|
||||
//////////////////////////////
|
||||
//- Reset peer data
|
||||
//- Reset frame data
|
||||
|
||||
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
|
||||
{
|
||||
peer->num_msg_packets_received_this_frame = 0;
|
||||
pipe->num_msg_packets_received_this_frame = 0;
|
||||
for (NET_W32_Peer *peer = pipe->first_peer; peer; peer = peer->next)
|
||||
{
|
||||
peer->num_msg_packets_received_this_frame = 0;
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
@ -538,149 +566,152 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
struct sockaddr_in6 addr = Zi;
|
||||
i32 addr_sz = sizeof(addr);
|
||||
len = recvfrom(pipe->udp, (char *)buff, countof(buff), 0, (struct sockaddr *)&addr, &addr_sz);
|
||||
if (
|
||||
len >= sizeof(NET_W32_PacketHeader) &&
|
||||
len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize &&
|
||||
MatchBytes(buff, &magic, sizeof(magic))
|
||||
)
|
||||
Atomic64FetchAdd(&pipe->total_bytes_received, len);
|
||||
if (len >= sizeof(NET_W32_PacketHeader) && len <= sizeof(NET_W32_PacketHeader) + NET_PacketSize)
|
||||
{
|
||||
NET_Key key = NET_W32_KeyFromAddress(addr);
|
||||
NET_W32_PacketHeader header = Zi;
|
||||
CopyBytes(&header, buff, sizeof(header));
|
||||
String src_data = Zi;
|
||||
src_data.text = buff + sizeof(header);
|
||||
src_data.len = len - sizeof(header);
|
||||
CopyBytes(buff, &magic, sizeof(magic));
|
||||
u32 checksum = NET_W32_ChecksumFromString(STRING(len, buff));
|
||||
if (header.checksum == checksum)
|
||||
{
|
||||
NET_Key key = NET_W32_KeyFromAddress(addr);
|
||||
String src_data = Zi;
|
||||
src_data.text = buff + sizeof(header);
|
||||
src_data.len = len - sizeof(header);
|
||||
|
||||
//- Fetch peer
|
||||
NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key);
|
||||
peer->key = key;
|
||||
peer->last_packet_received_ns = TimeNs();
|
||||
if (header.bottom_ack == peer->remote_bottom_ack)
|
||||
{
|
||||
peer->remote_ack_bits |= header.ack_bits;
|
||||
}
|
||||
else if (header.bottom_ack > peer->remote_bottom_ack)
|
||||
{
|
||||
peer->remote_bottom_ack = header.bottom_ack;
|
||||
peer->remote_ack_bits = header.ack_bits;
|
||||
}
|
||||
|
||||
//- Read msg packet
|
||||
if (!(header.flags & NET_W32_PacketFlag_Heartbeat))
|
||||
{
|
||||
// Update our stats
|
||||
b32 is_sequential = 0;
|
||||
b32 should_process = 0;
|
||||
if (header.seq == peer->bottom_ack + 1)
|
||||
//- Fetch peer
|
||||
NET_W32_Peer *peer = NET_W32_TouchPeerFromKey(pipe, key);
|
||||
peer->key = key;
|
||||
peer->last_packet_received_ns = TimeNs();
|
||||
if (header.bottom_ack == peer->remote_bottom_ack)
|
||||
{
|
||||
is_sequential = 1;
|
||||
should_process = 1;
|
||||
peer->remote_ack_bits |= header.ack_bits;
|
||||
}
|
||||
else if (header.seq > peer->bottom_ack + 1 && header.seq <= peer->bottom_ack + 65)
|
||||
else if (header.bottom_ack > peer->remote_bottom_ack)
|
||||
{
|
||||
u64 ack_bit = (u64)1 << (header.seq - peer->bottom_ack - 2);
|
||||
should_process = !(peer->ack_bits & ack_bit);
|
||||
peer->ack_bits |= ack_bit;
|
||||
peer->remote_bottom_ack = header.bottom_ack;
|
||||
peer->remote_ack_bits = header.ack_bits;
|
||||
}
|
||||
peer->num_msg_packets_received_this_frame += 1;
|
||||
|
||||
// Process packet
|
||||
if (should_process)
|
||||
//- Read msg packet
|
||||
if (!(header.flags & NET_W32_PacketFlag_Heartbeat))
|
||||
{
|
||||
NET_W32_Packet *packet = NET_W32.first_free_packet;
|
||||
if (packet)
|
||||
// Update our stats
|
||||
b32 is_sequential = 0;
|
||||
b32 should_process = 0;
|
||||
if (header.seq == peer->bottom_ack + 1)
|
||||
{
|
||||
SllStackPop(NET_W32.first_free_packet);
|
||||
String old_packet_data = packet->data;
|
||||
ZeroStruct(packet);
|
||||
packet->data = old_packet_data;
|
||||
packet->data.len = 0;
|
||||
is_sequential = 1;
|
||||
should_process = 1;
|
||||
}
|
||||
else
|
||||
else if (header.seq > peer->bottom_ack + 1 && header.seq <= peer->bottom_ack + 65)
|
||||
{
|
||||
packet = PushStruct(perm, NET_W32_Packet);
|
||||
packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize);
|
||||
u64 ack_bit = (u64)1 << (header.seq - peer->bottom_ack - 2);
|
||||
should_process = !(peer->ack_bits & ack_bit);
|
||||
peer->ack_bits |= ack_bit;
|
||||
}
|
||||
packet->flags = header.flags;
|
||||
packet->seq = header.seq;
|
||||
packet->msg_seq = header.msg_seq;
|
||||
CopyBytes(packet->data.text, src_data.text, src_data.len);
|
||||
packet->data.len = src_data.len;
|
||||
peer->num_msg_packets_received_this_frame += 1;
|
||||
pipe->num_msg_packets_received_this_frame += 1;
|
||||
|
||||
// Insert packet
|
||||
// Process packet
|
||||
if (should_process)
|
||||
{
|
||||
NET_W32_Packet *left = peer->last_fragmented_packet;
|
||||
for (; left; left = left->prev)
|
||||
NET_W32_Packet *packet = NET_W32.first_free_packet;
|
||||
if (packet)
|
||||
{
|
||||
if (left->seq < packet->seq)
|
||||
{
|
||||
break;
|
||||
}
|
||||
SllStackPop(NET_W32.first_free_packet);
|
||||
String old_packet_data = packet->data;
|
||||
ZeroStruct(packet);
|
||||
packet->data = old_packet_data;
|
||||
packet->data.len = 0;
|
||||
}
|
||||
DllQueueInsert(peer->first_fragmented_packet, peer->last_fragmented_packet, left, packet);
|
||||
}
|
||||
|
||||
// Transfer fragmented -> contiguous packets
|
||||
if (is_sequential)
|
||||
{
|
||||
NET_W32_Packet *contig_start = peer->first_fragmented_packet;
|
||||
NET_W32_Packet *contig_end = packet;
|
||||
for (NET_W32_Packet *tmp = packet->next; tmp; tmp = tmp->next)
|
||||
else
|
||||
{
|
||||
if (tmp->seq == tmp->prev->seq + 1)
|
||||
packet = PushStruct(perm, NET_W32_Packet);
|
||||
packet->data.text = PushStructsNoZero(perm, u8, NET_PacketSize);
|
||||
}
|
||||
packet->flags = header.flags;
|
||||
packet->seq = header.seq;
|
||||
packet->msg_seq = header.msg_seq;
|
||||
CopyBytes(packet->data.text, src_data.text, src_data.len);
|
||||
packet->data.len = src_data.len;
|
||||
|
||||
// Insert packet
|
||||
{
|
||||
NET_W32_Packet *left = peer->last_fragmented_packet;
|
||||
for (; left; left = left->prev)
|
||||
{
|
||||
contig_end = tmp;
|
||||
if (left->seq < packet->seq)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
DllQueueInsert(peer->first_fragmented_packet, peer->last_fragmented_packet, left, packet);
|
||||
}
|
||||
|
||||
// Transfer fragmented -> contiguous packets
|
||||
if (is_sequential)
|
||||
{
|
||||
NET_W32_Packet *contig_start = peer->first_fragmented_packet;
|
||||
NET_W32_Packet *contig_end = packet;
|
||||
for (NET_W32_Packet *tmp = packet->next; tmp; tmp = tmp->next)
|
||||
{
|
||||
if (tmp->seq == tmp->prev->seq + 1)
|
||||
{
|
||||
contig_end = tmp;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (peer->last_contiguous_packet)
|
||||
{
|
||||
peer->last_contiguous_packet->next = contig_start;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
peer->first_contiguous_packet = contig_start;
|
||||
}
|
||||
}
|
||||
contig_start->prev = peer->last_contiguous_packet;
|
||||
peer->last_contiguous_packet = contig_end;
|
||||
|
||||
if (peer->last_contiguous_packet)
|
||||
{
|
||||
peer->last_contiguous_packet->next = contig_start;
|
||||
}
|
||||
else
|
||||
{
|
||||
peer->first_contiguous_packet = contig_start;
|
||||
}
|
||||
contig_start->prev = peer->last_contiguous_packet;
|
||||
peer->last_contiguous_packet = contig_end;
|
||||
if (contig_end->next)
|
||||
{
|
||||
contig_end->next->prev = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
peer->last_fragmented_packet = 0;
|
||||
}
|
||||
peer->first_fragmented_packet = contig_end->next;
|
||||
contig_end->next = 0;
|
||||
|
||||
if (contig_end->next)
|
||||
{
|
||||
contig_end->next->prev = 0;
|
||||
i64 diff = contig_end->seq - peer->bottom_ack;
|
||||
if (diff <= 63)
|
||||
{
|
||||
peer->ack_bits >>= diff;
|
||||
}
|
||||
else
|
||||
{
|
||||
peer->ack_bits = 0;
|
||||
}
|
||||
peer->bottom_ack = contig_end->seq;
|
||||
}
|
||||
else
|
||||
{
|
||||
peer->last_fragmented_packet = 0;
|
||||
}
|
||||
peer->first_fragmented_packet = contig_end->next;
|
||||
contig_end->next = 0;
|
||||
|
||||
i64 diff = contig_end->seq - peer->bottom_ack;
|
||||
if (diff <= 63)
|
||||
{
|
||||
peer->ack_bits >>= diff;
|
||||
}
|
||||
else
|
||||
{
|
||||
peer->ack_bits = 0;
|
||||
}
|
||||
peer->bottom_ack = contig_end->seq;
|
||||
}
|
||||
}
|
||||
|
||||
LogDebugF(
|
||||
"Received msg packet. seq: %F, msg seq: %F, new bottom ack: %F, new ack bits: %F, should_process: %F, data: \"%F\"",
|
||||
FmtSint(header.seq),
|
||||
FmtSint(header.msg_seq),
|
||||
FmtSint(peer->bottom_ack),
|
||||
FmtUint(peer->ack_bits),
|
||||
FmtUint(should_process),
|
||||
FmtString(src_data)
|
||||
);
|
||||
// LogDebugF(
|
||||
// "Received msg packet. seq: %F, msg seq: %F, new bottom ack: %F, new ack bits: %F, should_process: %F, data: \"%F\"",
|
||||
// FmtSint(header.seq),
|
||||
// FmtSint(header.msg_seq),
|
||||
// FmtSint(peer->bottom_ack),
|
||||
// FmtUint(peer->ack_bits),
|
||||
// FmtUint(should_process),
|
||||
// FmtString(src_data)
|
||||
// );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -690,8 +721,8 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
//- Assemble read messages
|
||||
|
||||
// TODO: Maximum message size
|
||||
// TODO: Don't lock if no msgs received on pipe
|
||||
|
||||
if (pipe->num_msg_packets_received_this_frame > 0)
|
||||
{
|
||||
LockTicketMutex(&pipe->back_msg_buff_seq_tm);
|
||||
{
|
||||
@ -741,11 +772,10 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
msg_data.text = ArenaNext(msg_buff->arena, u8);
|
||||
msg_data.len = 0;
|
||||
|
||||
LogDebugF(
|
||||
"Assembled msg with msg seq: %F, data (%F bytes): \"%F\"",
|
||||
LogSuccessF(
|
||||
"Assembled msg with msg seq: %F, data (%F bytes)",
|
||||
FmtSint(packet->msg_seq),
|
||||
FmtUint(msg->data.len),
|
||||
FmtString(msg->data)
|
||||
FmtUint(msg->data.len)
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -786,11 +816,12 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
String src_data = cmd->data;
|
||||
i64 src_pos = 0;
|
||||
i64 msg_seq = ++peer->msg_seq;
|
||||
LogDebugF("Queued msg with seq %F", FmtSint(msg_seq));
|
||||
// LogDebugF("Queued msg with seq %F", FmtSint(msg_seq));
|
||||
|
||||
// TODO: Burst packets
|
||||
|
||||
while (src_pos < (i64)src_data.len)
|
||||
b32 is_msg_end = 0;
|
||||
while (!is_msg_end)
|
||||
{
|
||||
i64 copy_len = MinI64(NET_PacketSize - (i64)peer->fragment.len, src_data.len - src_pos);
|
||||
CopyBytes(peer->fragment.text + peer->fragment.len, src_data.text + src_pos, copy_len);
|
||||
@ -798,7 +829,7 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
peer->fragment.len += copy_len;
|
||||
|
||||
// Push packet
|
||||
b32 is_msg_end = src_pos >= (i64)src_data.len;
|
||||
is_msg_end = src_pos >= (i64)src_data.len;
|
||||
if (peer->fragment.len == NET_PacketSize || is_msg_end)
|
||||
{
|
||||
NET_W32_Packet *packet = NET_W32.first_free_packet;
|
||||
@ -819,7 +850,6 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
packet->msg_seq = msg_seq;
|
||||
CopyBytes(packet->data.text, peer->fragment.text, peer->fragment.len);
|
||||
packet->data.len = peer->fragment.len;
|
||||
|
||||
if (is_msg_end)
|
||||
{
|
||||
packet->flags |= NET_W32_PacketFlag_EndMsg;
|
||||
@ -859,13 +889,16 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
u8 buff[Kibi(2)];
|
||||
NET_W32_PacketHeader header = Zi;
|
||||
{
|
||||
header.magic = magic;
|
||||
header.checksum = magic;
|
||||
header.flags = NET_W32_PacketFlag_Heartbeat;
|
||||
header.bottom_ack = peer->bottom_ack;
|
||||
header.ack_bits = peer->ack_bits;
|
||||
}
|
||||
CopyBytes(buff, &header, sizeof(header));
|
||||
buff_len += sizeof(header);
|
||||
|
||||
u32 checksum = NET_W32_ChecksumFromString(STRING(buff_len, buff));
|
||||
CopyBytes(buff, &checksum, sizeof(checksum));
|
||||
sendto(
|
||||
pipe->udp,
|
||||
(char *)buff,
|
||||
@ -874,6 +907,7 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
(struct sockaddr *)&addr,
|
||||
sizeof(addr)
|
||||
);
|
||||
Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len);
|
||||
peer->last_packet_sent_ns = now_ns;
|
||||
}
|
||||
}
|
||||
@ -920,22 +954,18 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
}
|
||||
else
|
||||
{
|
||||
// Transmit unacked packet
|
||||
i64 now_ns = TimeNs();
|
||||
if (packet->last_sent_ns == 0 || (now_ns - packet->last_sent_ns > msg_resend_threshold_ns))
|
||||
{
|
||||
peer->last_packet_sent_ns = now_ns;
|
||||
packet->last_sent_ns = now_ns;
|
||||
|
||||
// Transmit unacked packet
|
||||
// FIXME: Rate limit, don't send if we've already sent in the last second.
|
||||
// NOTE: If we do this we should probably put the net worker on something like a 1-second auto-run timer
|
||||
// TODO: crc32
|
||||
i64 buff_len = 0;
|
||||
u8 buff[Kibi(2)];
|
||||
|
||||
NET_W32_PacketHeader header = Zi;
|
||||
{
|
||||
header.magic = magic;
|
||||
header.checksum = magic;
|
||||
header.flags = packet->flags;
|
||||
header.seq = packet->seq;
|
||||
header.msg_seq = packet->msg_seq;
|
||||
@ -948,6 +978,8 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
CopyBytes(buff + buff_len, packet->data.text, packet->data.len);
|
||||
buff_len += packet->data.len;
|
||||
|
||||
u32 checksum = NET_W32_ChecksumFromString(STRING(buff_len, buff));
|
||||
CopyBytes(buff, &checksum, sizeof(checksum));
|
||||
sendto(
|
||||
pipe->udp,
|
||||
(char *)buff,
|
||||
@ -956,13 +988,14 @@ void NET_W32_TickForever(WaveLaneCtx *lane)
|
||||
(struct sockaddr *)&addr,
|
||||
sizeof(addr)
|
||||
);
|
||||
Atomic64FetchAdd(&pipe->total_bytes_sent, buff_len);
|
||||
|
||||
LogDebugF(
|
||||
"Sent msg packet. seq: %F, msg seq: %F, data: \"%F\"",
|
||||
FmtSint(header.seq),
|
||||
FmtSint(header.msg_seq),
|
||||
FmtString(packet->data)
|
||||
);
|
||||
// LogDebugF(
|
||||
// "Sent msg packet. seq: %F, msg seq: %F, data: \"%F\"",
|
||||
// FmtSint(header.seq),
|
||||
// FmtSint(header.msg_seq),
|
||||
// FmtString(packet->data)
|
||||
// );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +46,7 @@ Enum(NET_W32_PacketFlag)
|
||||
|
||||
Struct(NET_W32_PacketHeader)
|
||||
{
|
||||
u32 magic;
|
||||
u32 checksum;
|
||||
NET_W32_PacketFlag flags;
|
||||
i64 seq;
|
||||
i64 msg_seq;
|
||||
@ -121,6 +121,9 @@ Struct(NET_W32_Pipe)
|
||||
|
||||
Atomic64 desired_port; // >64k means ephemeral
|
||||
|
||||
Atomic64 total_bytes_sent;
|
||||
Atomic64 total_bytes_received;
|
||||
|
||||
TicketMutex back_cmd_buff_seq_tm;
|
||||
i64 back_cmd_buff_seq;
|
||||
NET_W32_CmdBuff cmd_buffs[2];
|
||||
@ -133,6 +136,7 @@ Struct(NET_W32_Pipe)
|
||||
|
||||
NET_W32_Peer *first_peer;
|
||||
NET_W32_Peer *last_peer;
|
||||
i64 num_msg_packets_received_this_frame;
|
||||
|
||||
i64 peer_bins_count;
|
||||
NET_W32_PeerBin *peer_bins;
|
||||
@ -177,6 +181,7 @@ struct sockaddr_in6 NET_W32_AddressFromKey(NET_Key key);
|
||||
u64 NET_W32_HashFromKey(NET_Key key);
|
||||
void NET_W32_SignalWorker(void);
|
||||
NET_W32_Peer *NET_W32_TouchPeerFromKey(NET_W32_Pipe *pipe, NET_Key key);
|
||||
u32 NET_W32_ChecksumFromString(String str);
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//~ Worker
|
||||
|
||||
@ -16,8 +16,7 @@ Readonly P_Frame P_NilFrame = {
|
||||
|
||||
void P_Bootstrap(void)
|
||||
{
|
||||
P.u2s_msgs_arena = AcquireArena(Gibi(64));
|
||||
P.s2u_msgs_arena = AcquireArena(Gibi(64));
|
||||
P.s2u.arena = AcquireArena(Gibi(64));
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
|
||||
28
src/pp/pp.h
28
src/pp/pp.h
@ -284,10 +284,6 @@ Struct(P_SimSnapshot)
|
||||
i64 deltas_count;
|
||||
P_SimDeltaNode *first_delta_node;
|
||||
P_SimDeltaNode *last_delta_node;
|
||||
|
||||
i64 debug_draw_nodes_count;
|
||||
P_DebugDrawNode *first_debug_draw_node;
|
||||
P_DebugDrawNode *last_debug_draw_node;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
@ -421,20 +417,18 @@ Struct(P_RaycastResult)
|
||||
|
||||
Struct(P_Ctx)
|
||||
{
|
||||
//- User -> sim messages
|
||||
Arena *u2s_msgs_arena;
|
||||
TicketMutex u2s_msgs_mutex;
|
||||
P_MsgList u2s_msgs;
|
||||
// Sim -> User state
|
||||
TicketMutex s2u_tm;
|
||||
struct
|
||||
{
|
||||
i64 gen;
|
||||
Arena *arena;
|
||||
i64 debug_draw_nodes_count;
|
||||
P_DebugDrawNode *first_debug_draw_node;
|
||||
P_DebugDrawNode *last_debug_draw_node;
|
||||
|
||||
//- Sim -> user messages
|
||||
Arena *s2u_msgs_arena;
|
||||
TicketMutex s2u_msgs_mutex;
|
||||
P_MsgList s2u_msgs;
|
||||
|
||||
//- Sim -> user snapshot
|
||||
Arena *s2u_snapshot_arena;
|
||||
TicketMutex s2u_snapshot_mutex;
|
||||
P_SimSnapshot s2u_snapshot;
|
||||
NET_PipeStatistics pipe_stats;
|
||||
} s2u;
|
||||
};
|
||||
|
||||
Struct(P_ThreadLocalCtx)
|
||||
|
||||
@ -614,6 +614,26 @@ void S_TickForever(WaveLaneCtx *lane)
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- Publish Sim -> User data
|
||||
|
||||
{
|
||||
LockTicketMutex(&P.s2u_tm);
|
||||
{
|
||||
{
|
||||
i64 old_gen = P.s2u.gen;
|
||||
Arena *old_arena = P.s2u.arena;
|
||||
ZeroStruct(&P.s2u);
|
||||
P.s2u.arena = old_arena;
|
||||
P.s2u.gen = old_gen;
|
||||
}
|
||||
ZeroStruct(P.s2u.arena);
|
||||
P.s2u.gen += 1;
|
||||
P.s2u.pipe_stats = NET_StatsFromPipe(net_pipe);
|
||||
}
|
||||
UnlockTicketMutex(&P.s2u_tm);
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- End sim frame
|
||||
|
||||
|
||||
@ -350,9 +350,11 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
//////////////////////////////
|
||||
//- Init vis state
|
||||
|
||||
i64 s2u_gen = 0;
|
||||
Arena *sim_debug_arena = AcquireArena(Gibi(64));
|
||||
P_DebugDrawNode *first_sim_debug_draw_node = 0;
|
||||
P_DebugDrawNode *last_sim_debug_draw_node = 0;
|
||||
NET_PipeStatistics sim_pipe_stats = Zi;
|
||||
|
||||
P_World *sim_world = P_AcquireWorld();
|
||||
P_World *predict_world = P_AcquireWorld();
|
||||
@ -644,6 +646,27 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
frame->ui_dims.y = MaxF32(frame->ui_dims.y, 64);
|
||||
frame->draw_dims = frame->ui_dims;
|
||||
|
||||
|
||||
//////////////////////////////
|
||||
//- Pop sim -> user data
|
||||
|
||||
{
|
||||
LockTicketMutex(&P.s2u_tm);
|
||||
{
|
||||
if (P.s2u.gen > s2u_gen)
|
||||
{
|
||||
s2u_gen = P.s2u.gen;
|
||||
sim_pipe_stats = P.s2u.pipe_stats;
|
||||
}
|
||||
}
|
||||
UnlockTicketMutex(&P.s2u_tm);
|
||||
}
|
||||
|
||||
//////////////////////////////
|
||||
//- Update pipe stats
|
||||
|
||||
NET_PipeStatistics vis_pipe_stats = NET_StatsFromPipe(net_pipe);
|
||||
|
||||
//////////////////////////////
|
||||
//- Process controller events into vis cmds
|
||||
|
||||
@ -2290,6 +2313,13 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
UI_BuildLabelF("Hovered ent: %F", P_FmtKey(hovered_ent->key));
|
||||
}
|
||||
UI_BuildSpacer(UI_PIX(padding, 1), Axis_Y);
|
||||
{
|
||||
UI_BuildLabelF("Client send: %F MiB", FmtFloat(CeilF64((f64)vis_pipe_stats.total_bytes_sent / 1024) / 1024, .p = 3));
|
||||
UI_BuildLabelF("Client recv: %F MiB", FmtFloat(CeilF64((f64)vis_pipe_stats.total_bytes_received / 1024) / 1024, .p = 3));
|
||||
UI_BuildLabelF("Server send: %F MiB", FmtFloat(CeilF64((f64)sim_pipe_stats.total_bytes_sent / 1024) / 1024, .p = 3));
|
||||
UI_BuildLabelF("Server recv: %F MiB", FmtFloat(CeilF64((f64)sim_pipe_stats.total_bytes_received / 1024) / 1024, .p = 3));
|
||||
}
|
||||
UI_BuildSpacer(UI_PIX(padding, 1), Axis_Y);
|
||||
{
|
||||
{
|
||||
UI_Push(FontSize, UI_Top(FontSize) * theme.h2);
|
||||
@ -2644,15 +2674,6 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
//////////////////////////////
|
||||
//- Pop snapshot from sim
|
||||
|
||||
// {
|
||||
// LockTicketMutex(&P.s2u_snapshot_mutex);
|
||||
// P_SimSnapshot *src_snapshot = &P.s2u_snapshot;
|
||||
@ -2775,22 +2796,22 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
}
|
||||
|
||||
//- Update sim debug info
|
||||
{
|
||||
ResetArena(sim_debug_arena);
|
||||
first_sim_debug_draw_node = 0;
|
||||
last_sim_debug_draw_node = 0;
|
||||
{
|
||||
i64 dst_idx = 0;
|
||||
P_DebugDrawNode *dst_nodes = PushStructsNoZero(sim_debug_arena, P_DebugDrawNode, snapshot->debug_draw_nodes_count);
|
||||
for (P_DebugDrawNode *src = snapshot->first_debug_draw_node; src; src = src->next)
|
||||
{
|
||||
P_DebugDrawNode *dst = &dst_nodes[dst_idx];
|
||||
*dst = *src;
|
||||
dst_idx += 1;
|
||||
SllQueuePush(first_sim_debug_draw_node, last_sim_debug_draw_node, dst);
|
||||
}
|
||||
}
|
||||
}
|
||||
// {
|
||||
// ResetArena(sim_debug_arena);
|
||||
// first_sim_debug_draw_node = 0;
|
||||
// last_sim_debug_draw_node = 0;
|
||||
// {
|
||||
// i64 dst_idx = 0;
|
||||
// P_DebugDrawNode *dst_nodes = PushStructsNoZero(sim_debug_arena, P_DebugDrawNode, snapshot->debug_draw_nodes_count);
|
||||
// for (P_DebugDrawNode *src = snapshot->first_debug_draw_node; src; src = src->next)
|
||||
// {
|
||||
// P_DebugDrawNode *dst = &dst_nodes[dst_idx];
|
||||
// *dst = *src;
|
||||
// dst_idx += 1;
|
||||
// SllQueuePush(first_sim_debug_draw_node, last_sim_debug_draw_node, dst);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
// TODO: Remove this
|
||||
@ -3762,12 +3783,8 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
|
||||
if (frame->held_buttons[Button_R] && !prev_frame->held_buttons[Button_R])
|
||||
{
|
||||
for (i64 i = 0; i < 100; ++i)
|
||||
{
|
||||
// NET_Push(net_pipe, server_key, Lit("HELLO!!!!"), 0);
|
||||
NET_Push(net_pipe, server_key, Lit("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), 0);
|
||||
}
|
||||
// NET_Push(net_pipe, server_key, Lit("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), 0);
|
||||
LogDebugF("Sending test payload");
|
||||
NET_Push(net_pipe, server_key, STRING(P_TilesCount, predict_world->tiles), 0);
|
||||
}
|
||||
|
||||
|
||||
@ -3779,6 +3796,10 @@ void V_TickForever(WaveLaneCtx *lane)
|
||||
String packed = P_PackMessage(frame->arena, msg);
|
||||
NET_Push(net_pipe, server_key, packed, msg->burst);
|
||||
}
|
||||
ResetArena(P_tl.out_msgs_arena);
|
||||
ZeroStruct(&P_tl.out_msgs);
|
||||
|
||||
|
||||
|
||||
//////////////////////////////
|
||||
//- End frame
|
||||
|
||||
Loading…
Reference in New Issue
Block a user