power_play/src/net/net.c

1076 lines
31 KiB
C

// TODO:
//
// Rate limiting.
//
// Resequence buffer to order incoming sequenced packets.
//
// Rolling window for message reassembly.
// This would remove the need for random access message buffers.
//
// Connection timeouts.
//
// Challenges to verify receiving address.
////////////////////////////////////////////////////////////
//~ Host
N_Host *N_AcquireHost(u16 listen_port)
{
Arena *arena = AcquireArena(Gibi(64));
N_Host *host = PushStruct(arena, N_Host);
host->arena = arena;
host->cmd_arena = AcquireArena(Gibi(64));
host->channel_arena = AcquireArena(Gibi(64));
host->rcv_buffer_read = PushStruct(host->arena, N_RcvBuffer);
host->rcv_buffer_write = PushStruct(host->arena, N_RcvBuffer);
host->rcv_buffer_read->arena = AcquireArena(Gibi(64));
host->rcv_buffer_write->arena = AcquireArena(Gibi(64));
host->buddy = AcquireBuddyCtx(Gibi(64));
host->channels = ArenaNext(host->channel_arena, N_Channel);
host->num_channel_lookup_bins = N_NumChannelLookupBins;
host->channel_lookup_bins = PushStructs(host->arena, N_ChannelLookupBin, host->num_channel_lookup_bins);
host->num_msg_assembler_lookup_bins = N_NumMsgAssemblerLookupBins;
host->msg_assembler_lookup_bins = PushStructs(host->arena, N_MsgAssemblerLookupBin, host->num_msg_assembler_lookup_bins);
host->sock = P_AcquireSock(listen_port, Mebi(2), Mebi(2));
return host;
}
void N_ReleaseHost(N_Host *host)
{
P_ReleaseSock(host->sock);
ReleaseBuddyCtx(host->buddy);
ReleaseArena(host->rcv_buffer_write->arena);
ReleaseArena(host->rcv_buffer_read->arena);
ReleaseArena(host->channel_arena);
ReleaseArena(host->cmd_arena);
ReleaseArena(host->arena);
}
////////////////////////////////////////////////////////////
//~ Channel
u64 N_HashFromAddress(P_Address address)
{
return HashFnv64(Fnv64Basis, StringFromStruct(&address));
}
N_Channel *N_ChannelFromAddress(N_Host *host, P_Address address)
{
u64 hash = N_HashFromAddress(address);
N_ChannelLookupBin *bin = &host->channel_lookup_bins[hash % host->num_channel_lookup_bins];
for (N_Channel *channel = bin->first; channel; channel = channel->next_address_hash)
{
if (channel->address_hash == hash && P_MatchAddress(channel->address, address))
{
return channel;
}
}
return &N_nil_channel;
}
// Returns nil channel if id = N_AllChannelsId
N_Channel *N_ChannelFromId(N_Host *host, N_ChannelId channel_id)
{
if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved)
{
N_Channel *channel = &host->channels[channel_id.idx];
if (channel->id.gen == channel_id.gen)
{
return channel;
}
}
return &N_nil_channel;
}
N_ChannelList N_ChannelsFromId(Arena *arena, N_Host *host, N_ChannelId channel_id)
{
N_ChannelList result = ZI;
if (N_MatchChannelId(channel_id, N_AllChannelsId))
{
for (u64 i = 0; i < host->num_channels_reserved; ++i)
{
N_Channel *channel = &host->channels[i];
if (channel->valid)
{
N_ChannelNode *n = PushStruct(arena, N_ChannelNode);
n->channel = channel;
if (result.last)
{
result.last->next = n;
}
else
{
result.first = n;
}
result.last = n;
}
}
}
else
{
N_Channel *channel = N_ChannelFromId(host, channel_id);
if (channel->valid)
{
N_ChannelNode *n = PushStruct(arena, N_ChannelNode);
n->channel = channel;
result.first = n;
result.last = n;
}
}
return result;
}
N_Channel *N_AcquireChannel(N_Host *host, P_Address address)
{
N_ChannelId id = ZI;
N_Channel *channel;
if (host->first_free_channel)
{
channel = host->first_free_channel;
host->first_free_channel = channel->next_free;
id = channel->id;
++id.gen;
}
else
{
channel = PushStructNoZero(host->channel_arena, N_Channel);
id.gen = 1;
id.idx = host->num_channels_reserved;
++host->num_channels_reserved;
}
ZeroStruct(channel);
channel->valid = 1;
channel->id = id;
channel->host = host;
channel->address = address;
u64 address_hash = N_HashFromAddress(address);
channel->address_hash = address_hash;
u64 bin_index = address_hash % host->num_channel_lookup_bins;
N_ChannelLookupBin *bin = &host->channel_lookup_bins[bin_index];
if (bin->last)
{
channel->prev_address_hash = bin->last;
bin->last->next_address_hash = channel;
}
else
{
bin->first = channel;
}
bin->last = channel;
return channel;
}
void N_ReleaseChannel(N_Channel *channel)
{
N_Host *host = channel->host;
// Release from lookup table
{
N_ChannelLookupBin *bin = &host->channel_lookup_bins[channel->address_hash % host->num_channel_lookup_bins];
N_Channel *prev = channel->prev_address_hash;
N_Channel *next = channel->next_address_hash;
if (prev)
{
prev->next_address_hash = next;
}
else
{
bin->first = next;
}
if (next)
{
next->prev_address_hash = prev;
}
else
{
bin->last = prev;
}
}
// Release packets
{
if (channel->first_unreliable_packet)
{
host->first_free_packet = channel->first_unreliable_packet;
channel->last_unreliable_packet->next = host->first_free_packet;
}
if (channel->first_reliable_packet)
{
host->first_free_packet = channel->first_reliable_packet;
channel->last_reliable_packet->next = host->first_free_packet;
}
}
// Release msg assemblers
for (N_MsgAssembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent)
{
N_ReleaseMessageAssembler(ma);
}
++channel->id.gen;
channel->valid = 0;
channel->next_free = host->first_free_channel;
host->first_free_channel = channel;
}
////////////////////////////////////////////////////////////
//~ Message assembler
u64 N_HashFromMsg(N_ChannelId channel_id, u64 msg_id)
{
u64 result = Fnv64Basis;
result = HashFnv64(result, StringFromStruct(&channel_id));
result = HashFnv64(result, StringFromStruct(&msg_id));
return result;
}
N_MsgAssembler *N_MsgAssemblerFromMsg(N_Host *host, N_ChannelId channel_id, u64 msg_id)
{
u64 hash = N_HashFromMsg(channel_id, msg_id);
N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins];
for (N_MsgAssembler *ma = bin->first; ma; ma = ma->next_hash)
{
if (ma->hash == hash && N_MatchChannelId(ma->channel->id, channel_id) && ma->msg_id == msg_id)
{
return ma;
}
}
return 0;
}
N_MsgAssembler *N_AcquireMsgAssembler(N_Channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable)
{
N_Host *host = channel->host;
N_MsgAssembler *ma;
if (host->first_free_msg_assembler)
{
ma = host->first_free_msg_assembler;
host->first_free_msg_assembler = ma->next_free;
}
else
{
ma = PushStructNoZero(host->arena, N_MsgAssembler);
}
ZeroStruct(ma);
ma->channel = channel;
ma->msg_id = msg_id;
ma->num_chunks_total = chunk_count;
u64 chunk_bitmap_size = (chunk_count + 7) >> 3;
if ((chunk_bitmap_size % 16) != 0)
{
// Align chunk bitmap to 16 so msg data is aligned
chunk_bitmap_size += 16 - (chunk_bitmap_size % 16);
}
u64 chunk_data_size = chunk_count * N_MaxPacketChunkLen;
// Acquire msg data using buddy allocator since the assembler has
// arbitrary lifetime and data needs to stay contiguous for random
// access as packets are received
ma->buddy_block = AcquireBuddyBlock(host->buddy, chunk_bitmap_size + chunk_data_size);
ma->chunk_bitmap = ma->buddy_block->memory;
ZeroBytes(ma->chunk_bitmap, chunk_bitmap_size);
ma->chunk_data = ma->chunk_bitmap + chunk_bitmap_size;
// FIXME: Ensure chunk_count > 0
ma->is_reliable = is_reliable;
// Add to channel list
ma->touched_ns = now_ns;
if (channel->most_recent_msg_assembler)
{
channel->most_recent_msg_assembler->more_recent = ma;
ma->less_recent = channel->most_recent_msg_assembler;
}
else
{
channel->least_recent_msg_assembler = ma;
}
channel->most_recent_msg_assembler = ma;
// Add to lookup table
u64 hash = N_HashFromMsg(channel->id, msg_id);
ma->hash = hash;
N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins];
if (bin->last)
{
bin->last->next_hash = ma;
ma->prev_hash = bin->last;
}
else
{
bin->first = ma;
}
bin->last = ma;
return ma;
}
void N_ReleaseMessageAssembler(N_MsgAssembler *ma)
{
N_Channel *channel = ma->channel;
N_Host *host = channel->host;
ReleaseBuddyBlock(ma->buddy_block);
// Release from channel list
{
N_MsgAssembler *prev = ma->less_recent;
N_MsgAssembler *next = ma->more_recent;
if (prev)
{
prev->more_recent = next;
}
else
{
channel->least_recent_msg_assembler = next;
}
if (next)
{
next->less_recent = prev;
}
else
{
channel->most_recent_msg_assembler = prev;
}
}
// Release from lookup table
N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[ma->hash % host->num_msg_assembler_lookup_bins];
{
N_MsgAssembler *prev = ma->prev_hash;
N_MsgAssembler *next = ma->next_hash;
if (prev)
{
prev->next_hash = next;
}
else
{
bin->first = next;
}
if (next)
{
next->prev_hash = prev;
}
else
{
bin->last = prev;
}
}
ma->next_free = host->first_free_msg_assembler;
host->first_free_msg_assembler = ma;
}
void N_TouchMessageAssembler(N_MsgAssembler *ma, i64 now_ns)
{
N_Channel *channel = ma->channel;
if (ma != channel->most_recent_msg_assembler)
{
// Remove from channel list
{
N_MsgAssembler *prev = ma->less_recent;
N_MsgAssembler *next = ma->more_recent;
if (prev)
{
prev->more_recent = next;
}
else
{
channel->least_recent_msg_assembler = next;
}
if (next)
{
next->less_recent = prev;
}
else
{
channel->most_recent_msg_assembler = prev;
}
}
// Insert at end of channel list
{
if (channel->most_recent_msg_assembler)
{
channel->most_recent_msg_assembler->more_recent = ma;
ma->less_recent = channel->most_recent_msg_assembler;
}
else
{
channel->least_recent_msg_assembler = ma;
}
channel->most_recent_msg_assembler = ma;
}
}
ma->touched_ns = now_ns;
}
b32 N_IsChunkFilled(N_MsgAssembler *ma, u64 chunk_id)
{
if (chunk_id < ma->num_chunks_total)
{
return (ma->chunk_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0;
}
return 0;
}
void N_MarkChunkReceived(N_MsgAssembler *ma, u64 chunk_id)
{
if (chunk_id < ma->num_chunks_total)
{
ma->chunk_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8));
}
}
////////////////////////////////////////////////////////////
//~ Packet
N_SndPacket *N_PushSndPacket(N_Channel *channel, b32 is_reliable)
{
N_Host *host = channel->host;
N_SndPacket *packet = 0;
if (host->first_free_packet)
{
packet = host->first_free_packet;
host->first_free_packet = packet->next;
}
else
{
packet = PushStructNoZero(host->arena, N_SndPacket);
}
ZeroStruct(packet);
if (is_reliable)
{
if (channel->last_reliable_packet)
{
channel->last_reliable_packet->next = packet;
}
else
{
channel->first_reliable_packet = packet;
}
channel->last_reliable_packet = packet;
++channel->num_reliable_packets;
packet->seq = ++channel->last_sent_seq;
}
else
{
if (channel->last_unreliable_packet)
{
channel->last_unreliable_packet->next = packet;
}
else
{
channel->first_unreliable_packet = packet;
}
channel->last_unreliable_packet = packet;
++channel->num_unreliable_packets;
}
return packet;
}
////////////////////////////////////////////////////////////
//~ Host commands
N_Cmd *N_PushCmd(N_Host *host)
{
N_Cmd *cmd = PushStruct(host->cmd_arena, N_Cmd);
if (host->last_cmd)
{
host->last_cmd->next = cmd;
}
else
{
host->first_cmd = cmd;
}
host->last_cmd = cmd;
return cmd;
}
void N_Connect(N_Host *host, P_Address connect_address)
{
N_Channel *channel = N_ChannelFromAddress(host, connect_address);
if (!channel->valid)
{
channel = N_AcquireChannel(host, connect_address);
}
}
void N_Disconnect(N_Host *host, N_ChannelId channel_id)
{
N_Cmd *cmd = N_PushCmd(host);
cmd->kind = N_CmdKind_Disconnect;
cmd->channel_id = channel_id;
}
void N_Write(N_Host *host, N_ChannelId channel_id, String msg, N_WriteFlag flags)
{
N_Cmd *cmd = N_PushCmd(host);
cmd->kind = N_CmdKind_Write;
cmd->channel_id = channel_id;
cmd->write_msg = PushString(host->cmd_arena, msg);
cmd->write_reliable = flags & N_WriteFlag_Reliable;
}
////////////////////////////////////////////////////////////
//~ Channel info
i64 N_GetChannelLastRttNs(N_Host *host, N_ChannelId channel_id)
{
N_Channel *channel = N_ChannelFromId(host, channel_id);
return channel->last_heartbeat_rtt_ns;
}
////////////////////////////////////////////////////////////
//~ Update begin
// Read incoming packets, update channels, and return events
N_EventList N_BeginUpdate(Arena *arena, N_Host *host)
{
TempArena scratch = BeginScratch(arena);
N_EventList events = ZI;
i64 now_ns = TimeNs();
//////////////////////////////
//- Read socket
{
N_RcvPacket *first_packet = 0;
N_RcvPacket *last_packet = 0;
{
P_Sock *sock = host->sock;
P_SockReadResult result = ZI;
while ((result = P_ReadSock(scratch.arena, sock)).valid)
{
P_Address address = result.address;
String data = result.data;
if (data.len > 0)
{
N_RcvPacket *packet = PushStruct(scratch.arena, N_RcvPacket);
packet->address = address;
packet->data = PushString(scratch.arena, data);
if (last_packet)
{
last_packet->next = packet;
}
else
{
first_packet = packet;
}
last_packet = packet;
}
}
}
//////////////////////////////
//- Process incoming packets
{
for (N_RcvPacket *packet = first_packet; packet; packet = packet->next)
{
//struct sock *sock = packet->sock;
P_Address address = packet->address;
BB_Buff bb = BB_BuffFromString(packet->data);
BB_Reader br = BB_ReaderFromBuff(&bb);
u32 magic = BB_ReadUBits(&br, 32); // TODO: implicitly encode magic into crc32
if (magic == N_PacketMagic)
{
// TODO: Combine kind byte with flags byte
N_Channel *channel = N_ChannelFromAddress(host, address);
N_PacketKind packet_kind = BB_ReadIBits(&br, 8);
u8 packet_flags = BB_ReadUBits(&br, 8);
u64 their_acked_seq = BB_ReadUV(&br);
if (channel->valid)
{
channel->last_packet_received_ns = now_ns;
if (their_acked_seq > channel->their_acked_seq)
{
channel->their_acked_seq = their_acked_seq;
}
}
b32 skip_packet = 0;
b32 is_reliable = packet_flags & N_PacketFlag_Reliable;
if (channel->valid)
{
if (is_reliable)
{
u64 packet_seq = BB_ReadUV(&br);
if (packet_seq == channel->our_acked_seq + 1)
{
channel->our_acked_seq = packet_seq;
}
else
{
skip_packet = 1;
}
}
}
if (!skip_packet)
{
switch (packet_kind)
{
default: break;
//////////////////////////////
//- Read packet kind: TryConnect
case N_PacketKind_TryConnect:
{
// A foreign host is trying to connect to us
if (!channel->valid)
{
LogInfoF("Host received conection attempt from %F", FmtString(P_StringFromAddress(scratch.arena, address)));
// TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect?
channel = N_AcquireChannel(host, address);
}
N_Cmd *cmd = N_PushCmd(host);
cmd->kind = N_CmdKind_ConnectSuccess;
cmd->channel_id = channel->id;
} break;
//////////////////////////////
//- Read packet kind: ConnectSuccess
case N_PacketKind_ConnectSuccess:
{
// We successfully connected to a foreign host and they are ready to receive messages
if (channel->valid && !channel->connected)
{
LogInfoF("Host received connection from %F", FmtString(P_StringFromAddress(scratch.arena, address)));
N_Event *event = N_PushEvent(arena, &events);
event->kind = N_EventKind_ChannelOpened;
event->channel_id = channel->id;
channel->connected = 1;
}
} break;
//////////////////////////////
//- Read packet kind: Disconnect
case N_PacketKind_Disconnect:
{
// A foreign host disconnected from us
if (channel->valid)
{
LogInfoF("Host received disconnection from %F", FmtString(P_StringFromAddress(scratch.arena, address)));
N_Event *event = N_PushEvent(arena, &events);
event->kind = N_EventKind_ChannelClosed;
event->channel_id = channel->id;
N_ReleaseChannel(channel);
}
} break;
//////////////////////////////
//- Read packet kind: Heartbeat
case N_PacketKind_Heartbeat:
{
if (channel->valid)
{
u16 heartbeat_id = BB_ReadUBits(&br, 16);
u16 acked_heartbeat_id = BB_ReadUBits(&br, 16);
if (heartbeat_id > channel->last_heartbeat_received_id)
{
channel->last_heartbeat_received_id = heartbeat_id;
}
if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1)
{
channel->last_heartbeat_acked_id = acked_heartbeat_id;
if (channel->last_heartbeat_acked_ns > 0)
{
channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns;
}
channel->last_heartbeat_acked_ns = now_ns;
}
}
} break;
//////////////////////////////
//- Read packet kind: MsgChunk
case N_PacketKind_MsgChunk:
{
if (channel->valid && channel->connected)
{
// Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id>
u64 msg_id = BB_ReadUV(&br);
u64 chunk_id = BB_ReadUV(&br);
u64 chunk_count = BB_ReadUV(&br);
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
u64 chunk_len = is_last_chunk ? BB_ReadUV(&br) : N_MaxPacketChunkLen;
N_MsgAssembler *ma = N_MsgAssemblerFromMsg(host, channel->id, msg_id);
if (!ma)
{
ma = N_AcquireMsgAssembler(channel, msg_id, chunk_count, now_ns, is_reliable);
}
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count)
{
if (!N_IsChunkFilled(ma, chunk_id))
{
u8 *src = BB_ReadBytesRaw(&br, chunk_len);
if (src)
{
u8 *dst = &ma->chunk_data[chunk_id * N_MaxPacketChunkLen];
CopyBytes(dst, src, chunk_len);
if (is_last_chunk)
{
ma->last_chunk_len = chunk_len;
}
N_MarkChunkReceived(ma, chunk_id);
++ma->num_chunks_received;
N_TouchMessageAssembler(ma, now_ns);
if (ma->num_chunks_received == chunk_count)
{
// All chunks filled, message has finished assembling
// TODO: Message ordering
N_Event *event = N_PushEvent(arena, &events);
String data = ZI;
data.len = ((chunk_count - 1) * N_MaxPacketChunkLen) + ma->last_chunk_len;
data.text = PushStructsNoZero(arena, u8, data.len);
CopyBytes(data.text, ma->chunk_data, data.len);
event->kind = N_EventKind_Msg;
event->msg = data;
event->channel_id = channel->id;
if (is_reliable)
{
// Release assembler if reliable
N_ReleaseMessageAssembler(ma);
}
}
}
else
{
// Overflow reading chunk
Assert(0);
}
}
}
else
{
// Chunk id/count mismatch
Assert(0);
}
}
} break;
}
}
host->bytes_received += packet->data.len;
}
}
}
}
//////////////////////////////
//- Update channels
{
for (u64 i = 0; i < host->num_channels_reserved; ++i)
{
N_Channel *channel = &host->channels[i];
if (channel->valid)
{
// Send / resend handshake if not connected
if (!channel->connected)
{
N_Cmd *cmd = N_PushCmd(host);
cmd->kind = N_CmdKind_TryConnect;
cmd->channel_id = channel->id;
}
// Send heartbeat
// TODO: Send this less frequently (once per second or half of timeout or something)
{
N_Cmd *cmd = N_PushCmd(host);
cmd->kind = N_CmdKind_Heartbeat;
cmd->heartbeat_id = channel->last_heartbeat_acked_id + 1;
cmd->heartbeat_ack_id = channel->last_heartbeat_received_id;
cmd->channel_id = channel->id;
}
// Release acked reliable packets
{
u64 acked_seq = channel->their_acked_seq;
N_SndPacket *packet = channel->first_reliable_packet;
while (packet)
{
N_SndPacket *next = packet->next;
u64 seq = packet->seq;
if (seq < acked_seq)
{
packet->next = host->first_free_packet;
host->first_free_packet = packet;
channel->first_reliable_packet = next;
--channel->num_reliable_packets;
}
else
{
break;
}
packet = next;
}
if (channel->first_reliable_packet == 0)
{
channel->last_reliable_packet = 0;
}
}
// Release timed out unreliable msg buffers
{
// TODO: Configurable timeout
i64 unreliable_msg_timeout_ns = NsFromSeconds(0.1);
N_MsgAssembler *ma = channel->least_recent_msg_assembler;
while (ma)
{
N_MsgAssembler *next = ma->more_recent;
if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns)
{
if (!ma->is_reliable)
{
N_ReleaseMessageAssembler(ma);
}
}
else
{
break;
}
ma = next;
}
}
}
}
}
EndScratch(scratch);
return events;
}
////////////////////////////////////////////////////////////
//~ Update end
// Process host cmds & send outgoing packets
void N_EndUpdate(N_Host *host)
{
TempArena scratch = BeginScratchNoConflict();
//////////////////////////////
//- Process cmds
// TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget
{
for (N_Cmd *cmd = host->first_cmd; cmd; cmd = cmd->next)
{
N_CmdKind kind = cmd->kind;
N_ChannelId channel_id = cmd->channel_id;
N_ChannelList channels = N_ChannelsFromId(scratch.arena, host, channel_id);
for (N_ChannelNode *node = channels.first; node; node = node->next)
{
N_Channel *channel = node->channel;
switch (kind)
{
default: break;
//////////////////////////////
//- Process command: TryConnect
case N_CmdKind_TryConnect:
{
u8 packet_flags = 0;
N_SndPacket *packet = N_PushSndPacket(channel, 0);
BB_Buff bb = BB_BuffFromString(StringFromFixedArray(packet->data));
BB_Writer bw = BB_WriterFromBuff(&bb);
BB_WriteUBits(&bw, N_PacketMagic, 32); // TODO: implicitly encode magic into crc32
BB_WriteIBits(&bw, N_PacketKind_TryConnect, 8);
BB_WriteUBits(&bw, packet_flags, 8);
BB_WriteUV(&bw, channel->our_acked_seq);
packet->data_len = BB_GetNumBytesWritten(&bw);
} break;
//////////////////////////////
//- Process command: ConnectSuccess
case N_CmdKind_ConnectSuccess:
{
u8 packet_flags = 0;
N_SndPacket *packet = N_PushSndPacket(channel, 0);
BB_Buff bb = BB_BuffFromString(StringFromFixedArray(packet->data));
BB_Writer bw = BB_WriterFromBuff(&bb);
BB_WriteUBits(&bw, N_PacketMagic, 32); // TODO: implicitly encode magic into crc32
BB_WriteIBits(&bw, N_PacketKind_ConnectSuccess, 8);
BB_WriteUBits(&bw, packet_flags, 8);
BB_WriteUV(&bw, channel->our_acked_seq);
packet->data_len = BB_GetNumBytesWritten(&bw);
} break;
//////////////////////////////
//- Process command: Disconnect
case N_CmdKind_Disconnect:
{
u8 packet_flags = 0;
N_SndPacket *packet = N_PushSndPacket(channel, 0);
BB_Buff bb = BB_BuffFromString(StringFromFixedArray(packet->data));
BB_Writer bw = BB_WriterFromBuff(&bb);
BB_WriteUBits(&bw, N_PacketMagic, 32); // TODO: implicitly encode magic into crc32
BB_WriteIBits(&bw, N_PacketKind_Disconnect, 8);
BB_WriteUBits(&bw, packet_flags, 8);
BB_WriteUV(&bw, channel->our_acked_seq);
packet->data_len = BB_GetNumBytesWritten(&bw);
} break;
//////////////////////////////
//- Process command: Heartbeat
case N_CmdKind_Heartbeat:
{
u8 packet_flags = 0;
N_SndPacket *packet = N_PushSndPacket(channel, 0);
BB_Buff bb = BB_BuffFromString(StringFromFixedArray(packet->data));
BB_Writer bw = BB_WriterFromBuff(&bb);
BB_WriteUBits(&bw, N_PacketMagic, 32); // TODO: implicitly encode magic into crc32
BB_WriteIBits(&bw, N_PacketKind_Heartbeat, 8);
BB_WriteUBits(&bw, packet_flags, 8);
BB_WriteUV(&bw, channel->our_acked_seq);
BB_WriteUBits(&bw, cmd->heartbeat_id, 16);
BB_WriteUBits(&bw, cmd->heartbeat_ack_id, 16);
packet->data_len = BB_GetNumBytesWritten(&bw);
} break;
//////////////////////////////
//- Process command: Write
case N_CmdKind_Write:
{
b32 is_reliable = cmd->write_reliable;
u8 packet_flags = (is_reliable * N_PacketFlag_Reliable);
String msg = cmd->write_msg;
u64 chunk_count = 0;
if (msg.len > 0)
{
chunk_count = (msg.len - 1) / N_MaxPacketChunkLen;
}
chunk_count += 1;
u64 msg_id = ++channel->last_sent_msg_id;
for (u64 i = 0; i < chunk_count; ++i)
{
u64 chunk_len = N_MaxPacketChunkLen;
b32 is_last_chunk = i + 1 == chunk_count;
if (is_last_chunk)
{
chunk_len = msg.len % N_MaxPacketChunkLen;
if (chunk_len == 0)
{
chunk_len = N_MaxPacketChunkLen;
}
}
N_SndPacket *packet = N_PushSndPacket(channel, is_reliable);
BB_Buff bb = BB_BuffFromString(StringFromFixedArray(packet->data));
BB_Writer bw = BB_WriterFromBuff(&bb);
BB_WriteUBits(&bw, N_PacketMagic, 32); // TODO: implicitly encode magic into crc32
BB_WriteIBits(&bw, N_PacketKind_MsgChunk, 8);
BB_WriteUBits(&bw, packet_flags, 8);
BB_WriteUV(&bw, channel->our_acked_seq);
if (is_reliable)
{
BB_WriteUV(&bw, packet->seq);
}
BB_WriteUV(&bw, msg_id);
BB_WriteUV(&bw, i);
BB_WriteUV(&bw, chunk_count);
if (is_last_chunk)
{
BB_WriteUV(&bw, chunk_len);
}
u8 *chunk_data = msg.text + (i * N_MaxPacketChunkLen);
BB_WriteBytes(&bw, STRING(chunk_len, chunk_data));
packet->data_len = BB_GetNumBytesWritten(&bw);
}
} break;
}
}
}
}
//////////////////////////////
//- Send packets
// TODO: Aggregate small packets
{
for (u64 i = 0; i < host->num_channels_reserved; ++i)
{
P_Sock *sock = host->sock;
N_Channel *channel = &host->channels[i];
u64 total_sent = 0;
if (channel->valid)
{
P_Address address = channel->address;
// Send reliable packets to channel
for (N_SndPacket *packet = channel->first_reliable_packet; packet; packet = packet->next)
{
P_WriteSock(sock, address, STRING(packet->data_len, packet->data));
total_sent += packet->data_len;
}
// Send unreliable packets to channel
for (N_SndPacket *packet = channel->first_unreliable_packet; packet; packet = packet->next)
{
P_WriteSock(sock, address, STRING(packet->data_len, packet->data));
total_sent += packet->data_len;
}
// Release unreliable packets
if (channel->first_unreliable_packet)
{
channel->last_unreliable_packet->next = host->first_free_packet;
host->first_free_packet = channel->first_unreliable_packet;
channel->first_unreliable_packet = 0;
channel->last_unreliable_packet = 0;
channel->num_unreliable_packets = 0;
}
host->bytes_sent += total_sent;
}
}
}
//////////////////////////////
//- Reset command list
host->first_cmd = 0;
host->last_cmd = 0;
ResetArena(host->cmd_arena);
EndScratch(scratch);
}
////////////////////////////////////////////////////////////
//~ PushEvent
N_Event *N_PushEvent(Arena *arena, N_EventList *list)
{
N_Event *event = PushStruct(arena, N_Event);
if (list->last)
{
list->last->next = event;
}
else
{
list->first = event;
}
list->last = event;
return event;
}