/* TODO: * * Rate limiting. * * Resequence buffer to order incoming sequenced packets. * * Rolling window for message reassembly. * This would remove the need for random access message buffers. * * Connection timeouts. * * Challenges to verify receiving address. */ //////////////////////////////////////////////////////////// //~ Host N_Host *N_AcquireHost(u16 listen_port) { Arena *arena = AcquireArena(Gibi(64)); N_Host *host = PushStruct(arena, N_Host); host->arena = arena; host->cmd_arena = AcquireArena(Gibi(64)); host->channel_arena = AcquireArena(Gibi(64)); host->rcv_buffer_read = PushStruct(host->arena, N_RcvBuffer); host->rcv_buffer_write = PushStruct(host->arena, N_RcvBuffer); host->rcv_buffer_read->arena = AcquireArena(Gibi(64)); host->rcv_buffer_write->arena = AcquireArena(Gibi(64)); host->buddy = AcquireBuddyCtx(Gibi(64)); host->channels = ArenaNext(host->channel_arena, N_Channel); host->num_channel_lookup_bins = N_NumChannelLookupBins; host->channel_lookup_bins = PushStructs(host->arena, N_ChannelLookupBin, host->num_channel_lookup_bins); host->num_msg_assembler_lookup_bins = N_NumMsgAssemblerLookupBins; host->msg_assembler_lookup_bins = PushStructs(host->arena, N_MsgAssemblerLookupBin, host->num_msg_assembler_lookup_bins); host->sock = P_AcquireSock(listen_port, Mebi(2), Mebi(2)); return host; } void N_ReleaseHost(N_Host *host) { P_ReleaseSock(host->sock); ReleaseBuddyCtx(host->buddy); ReleaseArena(host->rcv_buffer_write->arena); ReleaseArena(host->rcv_buffer_read->arena); ReleaseArena(host->channel_arena); ReleaseArena(host->cmd_arena); ReleaseArena(host->arena); } //////////////////////////////////////////////////////////// //~ Channel u64 N_HashFromAddress(P_Address address) { return HashFnv64(Fnv64Basis, StringFromStruct(&address)); } N_Channel *N_ChannelFromAddress(N_Host *host, P_Address address) { u64 hash = N_HashFromAddress(address); N_ChannelLookupBin *bin = &host->channel_lookup_bins[hash % host->num_channel_lookup_bins]; for (N_Channel *channel = bin->first; channel; channel = channel->next_address_hash) { if (channel->address_hash == hash && P_MatchAddress(channel->address, address)) { return channel; } } return &N_nil_channel; } /* Returns nil channel if id = N_AllChannelsId */ N_Channel *N_ChannelFromId(N_Host *host, N_ChannelId channel_id) { if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) { N_Channel *channel = &host->channels[channel_id.idx]; if (channel->id.gen == channel_id.gen) { return channel; } } return &N_nil_channel; } N_ChannelList N_ChannelsFromId(Arena *arena, N_Host *host, N_ChannelId channel_id) { N_ChannelList result = ZI; if (N_MatchChannelId(channel_id, N_AllChannelsId)) { for (u64 i = 0; i < host->num_channels_reserved; ++i) { N_Channel *channel = &host->channels[i]; if (channel->valid) { N_ChannelNode *n = PushStruct(arena, N_ChannelNode); n->channel = channel; if (result.last) { result.last->next = n; } else { result.first = n; } result.last = n; } } } else { N_Channel *channel = N_ChannelFromId(host, channel_id); if (channel->valid) { N_ChannelNode *n = PushStruct(arena, N_ChannelNode); n->channel = channel; result.first = n; result.last = n; } } return result; } N_Channel *N_AcquireChannel(N_Host *host, P_Address address) { N_ChannelId id = ZI; N_Channel *channel; if (host->first_free_channel) { channel = host->first_free_channel; host->first_free_channel = channel->next_free; id = channel->id; ++id.gen; } else { channel = PushStructNoZero(host->channel_arena, N_Channel); id.gen = 1; id.idx = host->num_channels_reserved; ++host->num_channels_reserved; } ZeroStruct(channel); channel->valid = 1; channel->id = id; channel->host = host; channel->address = address; u64 address_hash = N_HashFromAddress(address); channel->address_hash = address_hash; u64 bin_index = address_hash % host->num_channel_lookup_bins; N_ChannelLookupBin *bin = &host->channel_lookup_bins[bin_index]; if (bin->last) { channel->prev_address_hash = bin->last; bin->last->next_address_hash = channel; } else { bin->first = channel; } bin->last = channel; return channel; } void N_ReleaseChannel(N_Channel *channel) { N_Host *host = channel->host; /* Release from lookup table */ { N_ChannelLookupBin *bin = &host->channel_lookup_bins[channel->address_hash % host->num_channel_lookup_bins]; N_Channel *prev = channel->prev_address_hash; N_Channel *next = channel->next_address_hash; if (prev) { prev->next_address_hash = next; } else { bin->first = next; } if (next) { next->prev_address_hash = prev; } else { bin->last = prev; } } /* Release packets */ { if (channel->first_unreliable_packet) { host->first_free_packet = channel->first_unreliable_packet; channel->last_unreliable_packet->next = host->first_free_packet; } if (channel->first_reliable_packet) { host->first_free_packet = channel->first_reliable_packet; channel->last_reliable_packet->next = host->first_free_packet; } } /* Release msg assemblers */ for (N_MsgAssembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) { N_ReleaseMessageAssembler(ma); } ++channel->id.gen; channel->valid = 0; channel->next_free = host->first_free_channel; host->first_free_channel = channel; } //////////////////////////////////////////////////////////// //~ Message assembler u64 N_HashFromMsg(N_ChannelId channel_id, u64 msg_id) { u64 result = Fnv64Basis; result = HashFnv64(result, StringFromStruct(&channel_id)); result = HashFnv64(result, StringFromStruct(&msg_id)); return result; } N_MsgAssembler *N_MsgAssemblerFromMsg(N_Host *host, N_ChannelId channel_id, u64 msg_id) { u64 hash = N_HashFromMsg(channel_id, msg_id); N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; for (N_MsgAssembler *ma = bin->first; ma; ma = ma->next_hash) { if (ma->hash == hash && N_MatchChannelId(ma->channel->id, channel_id) && ma->msg_id == msg_id) { return ma; } } return 0; } N_MsgAssembler *N_AcquireMsgAssembler(N_Channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable) { N_Host *host = channel->host; N_MsgAssembler *ma; if (host->first_free_msg_assembler) { ma = host->first_free_msg_assembler; host->first_free_msg_assembler = ma->next_free; } else { ma = PushStructNoZero(host->arena, N_MsgAssembler); } ZeroStruct(ma); ma->channel = channel; ma->msg_id = msg_id; ma->num_chunks_total = chunk_count; u64 chunk_bitmap_size = (chunk_count + 7) >> 3; if ((chunk_bitmap_size % 16) != 0) { /* Align chunk bitmap to 16 so msg data is aligned */ chunk_bitmap_size += 16 - (chunk_bitmap_size % 16); } u64 chunk_data_size = chunk_count * N_MaxPacketChunkLen; /* Acquire msg data using buddy allocator since the assembler has * arbitrary lifetime and data needs to stay contiguous for random * access as packets are received */ ma->buddy_block = AcquireBuddyBlock(host->buddy, chunk_bitmap_size + chunk_data_size); ma->chunk_bitmap = ma->buddy_block->memory; ZeroBytes(ma->chunk_bitmap, chunk_bitmap_size); ma->chunk_data = ma->chunk_bitmap + chunk_bitmap_size; /* FIXME: Ensure chunk_count > 0 */ ma->is_reliable = is_reliable; /* Insert into channel list */ ma->touched_ns = now_ns; if (channel->most_recent_msg_assembler) { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; } else { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; /* Insert into lookup table */ u64 hash = N_HashFromMsg(channel->id, msg_id); ma->hash = hash; N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; if (bin->last) { bin->last->next_hash = ma; ma->prev_hash = bin->last; } else { bin->first = ma; } bin->last = ma; return ma; } void N_ReleaseMessageAssembler(N_MsgAssembler *ma) { N_Channel *channel = ma->channel; N_Host *host = channel->host; ReleaseBuddyBlock(ma->buddy_block); /* Release from channel list */ { N_MsgAssembler *prev = ma->less_recent; N_MsgAssembler *next = ma->more_recent; if (prev) { prev->more_recent = next; } else { channel->least_recent_msg_assembler = next; } if (next) { next->less_recent = prev; } else { channel->most_recent_msg_assembler = prev; } } /* Release from lookup table */ N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[ma->hash % host->num_msg_assembler_lookup_bins]; { N_MsgAssembler *prev = ma->prev_hash; N_MsgAssembler *next = ma->next_hash; if (prev) { prev->next_hash = next; } else { bin->first = next; } if (next) { next->prev_hash = prev; } else { bin->last = prev; } } ma->next_free = host->first_free_msg_assembler; host->first_free_msg_assembler = ma; } void N_TouchMessageAssembler(N_MsgAssembler *ma, i64 now_ns) { N_Channel *channel = ma->channel; if (ma != channel->most_recent_msg_assembler) { /* Remove from channel list */ { N_MsgAssembler *prev = ma->less_recent; N_MsgAssembler *next = ma->more_recent; if (prev) { prev->more_recent = next; } else { channel->least_recent_msg_assembler = next; } if (next) { next->less_recent = prev; } else { channel->most_recent_msg_assembler = prev; } } /* Insert at end of channel list */ { if (channel->most_recent_msg_assembler) { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; } else { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; } } ma->touched_ns = now_ns; } b32 N_IsChunkFilled(N_MsgAssembler *ma, u64 chunk_id) { if (chunk_id < ma->num_chunks_total) { return (ma->chunk_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0; } return 0; } void N_MarkChunkReceived(N_MsgAssembler *ma, u64 chunk_id) { if (chunk_id < ma->num_chunks_total) { ma->chunk_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8)); } } //////////////////////////////////////////////////////////// //~ Packet N_SndPacket *N_PushSndPacket(N_Channel *channel, b32 is_reliable) { N_Host *host = channel->host; N_SndPacket *packet = 0; if (host->first_free_packet) { packet = host->first_free_packet; host->first_free_packet = packet->next; } else { packet = PushStructNoZero(host->arena, N_SndPacket); } ZeroStruct(packet); if (is_reliable) { if (channel->last_reliable_packet) { channel->last_reliable_packet->next = packet; } else { channel->first_reliable_packet = packet; } channel->last_reliable_packet = packet; ++channel->num_reliable_packets; packet->seq = ++channel->last_sent_seq; } else { if (channel->last_unreliable_packet) { channel->last_unreliable_packet->next = packet; } else { channel->first_unreliable_packet = packet; } channel->last_unreliable_packet = packet; ++channel->num_unreliable_packets; } return packet; } //////////////////////////////////////////////////////////// //~ Host commands N_Cmd *N_PushCmd(N_Host *host) { N_Cmd *cmd = PushStruct(host->cmd_arena, N_Cmd); if (host->last_cmd) { host->last_cmd->next = cmd; } else { host->first_cmd = cmd; } host->last_cmd = cmd; return cmd; } void N_Connect(N_Host *host, P_Address connect_address) { N_Channel *channel = N_ChannelFromAddress(host, connect_address); if (!channel->valid) { channel = N_AcquireChannel(host, connect_address); } } void N_Disconnect(N_Host *host, N_ChannelId channel_id) { N_Cmd *cmd = N_PushCmd(host); cmd->kind = N_CmdKind_Disconnect; cmd->channel_id = channel_id; } void N_Write(N_Host *host, N_ChannelId channel_id, String msg, N_WriteFlag flags) { N_Cmd *cmd = N_PushCmd(host); cmd->kind = N_CmdKind_Write; cmd->channel_id = channel_id; cmd->write_msg = PushString(host->cmd_arena, msg); cmd->write_reliable = flags & N_WriteFlag_Reliable; } //////////////////////////////////////////////////////////// //~ Channel info i64 N_GetChannelLastRttNs(N_Host *host, N_ChannelId channel_id) { N_Channel *channel = N_ChannelFromId(host, channel_id); return channel->last_heartbeat_rtt_ns; } //////////////////////////////////////////////////////////// //~ Update begin /* Read incoming packets, update channels, and return events */ N_EventList N_BeginUpdate(Arena *arena, N_Host *host) { TempArena scratch = BeginScratch(arena); N_EventList events = ZI; i64 now_ns = TimeNs(); { __profn("Read packets"); //- Read socket N_RcvPacket *first_packet = 0; N_RcvPacket *last_packet = 0; { __profn("Read socket"); P_Sock *sock = host->sock; P_SockReadResult result = ZI; while ((result = P_ReadSock(scratch.arena, sock)).valid) { P_Address address = result.address; String data = result.data; if (data.len > 0) { N_RcvPacket *packet = PushStruct(scratch.arena, N_RcvPacket); packet->address = address; packet->data = PushString(scratch.arena, data); if (last_packet) { last_packet->next = packet; } else { first_packet = packet; } last_packet = packet; } } } //- Read incoming packets { __profn("Process host packets"); for (N_RcvPacket *packet = first_packet; packet; packet = packet->next) { //struct sock *sock = packet->sock; P_Address address = packet->address; BB_Buff bb = BB_BuffFromString(packet->data); BB_Reader br = BB_ReaderFromBuff(&bb); u32 magic = BB_ReadUBits(&br, 32); /* TODO: implicitly encode magic into crc32 */ if (magic == N_PacketMagic) { /* TODO: Combine kind byte with flags byte */ N_Channel *channel = N_ChannelFromAddress(host, address); N_PacketKind packet_kind = BB_ReadIBits(&br, 8); u8 packet_flags = BB_ReadUBits(&br, 8); u64 their_acked_seq = BB_ReadUV(&br); if (channel->valid) { channel->last_packet_received_ns = now_ns; if (their_acked_seq > channel->their_acked_seq) { channel->their_acked_seq = their_acked_seq; } } b32 skip_packet = 0; b32 is_reliable = packet_flags & N_PacketFlag_Reliable; if (channel->valid) { if (is_reliable) { u64 packet_seq = BB_ReadUV(&br); if (packet_seq == channel->our_acked_seq + 1) { channel->our_acked_seq = packet_seq; } else { skip_packet = 1; } } } if (!skip_packet) { switch (packet_kind) { default: break; //- Read packet kind: TryConnect case N_PacketKind_TryConnect: { /* A foreign host is trying to connect to us */ if (!channel->valid) { LogInfoF("Host received conection attempt from %F", FmtString(P_StringFromAddress(scratch.arena, address))); /* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */ channel = N_AcquireChannel(host, address); } N_Cmd *cmd = N_PushCmd(host); cmd->kind = N_CmdKind_ConnectSuccess; cmd->channel_id = channel->id; } break; //- Read packet kind: ConnectSuccess case N_PacketKind_ConnectSuccess: { /* We successfully connected to a foreign host and they are ready to receive messages */ if (channel->valid && !channel->connected) { LogInfoF("Host received connection from %F", FmtString(P_StringFromAddress(scratch.arena, address))); N_Event *event = N_PushEvent(arena, &events); event->kind = N_EventKind_ChannelOpened; event->channel_id = channel->id; channel->connected = 1; } } break; //- Read packet kind: Disconnect case N_PacketKind_Disconnect: { /* A foreign host disconnected from us */ if (channel->valid) { LogInfoF("Host received disconnection from %F", FmtString(P_StringFromAddress(scratch.arena, address))); N_Event *event = N_PushEvent(arena, &events); event->kind = N_EventKind_ChannelClosed; event->channel_id = channel->id; N_ReleaseChannel(channel); } } break; //- Read packet kind: Heartbeat case N_PacketKind_Heartbeat: { if (channel->valid) { u16 heartbeat_id = BB_ReadUBits(&br, 16); u16 acked_heartbeat_id = BB_ReadUBits(&br, 16); if (heartbeat_id > channel->last_heartbeat_received_id) { channel->last_heartbeat_received_id = heartbeat_id; } if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) { channel->last_heartbeat_acked_id = acked_heartbeat_id; if (channel->last_heartbeat_acked_ns > 0) { channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns; } channel->last_heartbeat_acked_ns = now_ns; } } } break; //- Read packet kind: MsgChunk case N_PacketKind_MsgChunk: { if (channel->valid && channel->connected) { /* Packet is chunk out of belonging to message */ u64 msg_id = BB_ReadUV(&br); u64 chunk_id = BB_ReadUV(&br); u64 chunk_count = BB_ReadUV(&br); b32 is_last_chunk = (chunk_id + 1) == chunk_count; u64 chunk_len = is_last_chunk ? BB_ReadUV(&br) : N_MaxPacketChunkLen; N_MsgAssembler *ma = N_MsgAssemblerFromMsg(host, channel->id, msg_id); if (!ma) { ma = N_AcquireMsgAssembler(channel, msg_id, chunk_count, now_ns, is_reliable); } if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) { if (!N_IsChunkFilled(ma, chunk_id)) { u8 *src = BB_ReadBytesRaw(&br, chunk_len); if (src) { u8 *dst = &ma->chunk_data[chunk_id * N_MaxPacketChunkLen]; CopyBytes(dst, src, chunk_len); if (is_last_chunk) { ma->last_chunk_len = chunk_len; } N_MarkChunkReceived(ma, chunk_id); ++ma->num_chunks_received; N_TouchMessageAssembler(ma, now_ns); if (ma->num_chunks_received == chunk_count) { /* All chunks filled, message has finished assembling */ /* TODO: Message ordering */ N_Event *event = N_PushEvent(arena, &events); String data = ZI; data.len = ((chunk_count - 1) * N_MaxPacketChunkLen) + ma->last_chunk_len; data.text = PushStructsNoZero(arena, u8, data.len); CopyBytes(data.text, ma->chunk_data, data.len); event->kind = N_EventKind_Msg; event->msg = data; event->channel_id = channel->id; if (is_reliable) { /* Release assembler if reliable */ N_ReleaseMessageAssembler(ma); } } } else { /* Overflow reading chunk */ Assert(0); } } } else { /* Chunk id/count mismatch */ Assert(0); } } } break; } } host->bytes_received += packet->data.len; } } } } //- Update channels { __profn("Update host channels"); for (u64 i = 0; i < host->num_channels_reserved; ++i) { N_Channel *channel = &host->channels[i]; if (channel->valid) { /* Send / resend handshake if not connected */ if (!channel->connected) { N_Cmd *cmd = N_PushCmd(host); cmd->kind = N_CmdKind_TryConnect; cmd->channel_id = channel->id; } /* Send heartbeat */ /* TODO: Send this less frequently (once per second or half of timeout or something) */ { N_Cmd *cmd = N_PushCmd(host); cmd->kind = N_CmdKind_Heartbeat; cmd->heartbeat_id = channel->last_heartbeat_acked_id + 1; cmd->heartbeat_ack_id = channel->last_heartbeat_received_id; cmd->channel_id = channel->id; } /* Release acked reliable packets */ { u64 acked_seq = channel->their_acked_seq; N_SndPacket *packet = channel->first_reliable_packet; while (packet) { N_SndPacket *next = packet->next; u64 seq = packet->seq; if (seq < acked_seq) { packet->next = host->first_free_packet; host->first_free_packet = packet; channel->first_reliable_packet = next; --channel->num_reliable_packets; } else { break; } packet = next; } if (channel->first_reliable_packet == 0) { channel->last_reliable_packet = 0; } } /* Release timed out unreliable msg buffers */ { /* TODO: Configurable timeout */ i64 unreliable_msg_timeout_ns = NsFromSeconds(0.1); N_MsgAssembler *ma = channel->least_recent_msg_assembler; while (ma) { N_MsgAssembler *next = ma->more_recent; if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns) { if (!ma->is_reliable) { N_ReleaseMessageAssembler(ma); } } else { break; } ma = next; } } } } } EndScratch(scratch); return events; } //////////////////////////////////////////////////////////// //~ Update end /* Process host cmds & send outgoing packets */ void N_EndUpdate(N_Host *host) { __prof; TempArena scratch = BeginScratchNoConflict(); /* Process cmds into sendable packets */ /* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */ { __profn("Process host cmds"); for (N_Cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) { N_CmdKind kind = cmd->kind; N_ChannelId channel_id = cmd->channel_id; N_ChannelList channels = N_ChannelsFromId(scratch.arena, host, channel_id); for (N_ChannelNode *node = channels.first; node; node = node->next) { N_Channel *channel = node->channel; switch (kind) { default: break; //- Process command: TryConnect case N_CmdKind_TryConnect: { u8 packet_flags = 0; N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BB_BuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ BB_WriteIBits(&bw, N_PacketKind_TryConnect, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); packet->data_len = BB_GetNumBytesWritten(&bw); } break; //- Process command: ConnectSuccess case N_CmdKind_ConnectSuccess: { u8 packet_flags = 0; N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BB_BuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ BB_WriteIBits(&bw, N_PacketKind_ConnectSuccess, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); packet->data_len = BB_GetNumBytesWritten(&bw); } break; //- Process command: Disconnect case N_CmdKind_Disconnect: { u8 packet_flags = 0; N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BB_BuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ BB_WriteIBits(&bw, N_PacketKind_Disconnect, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); packet->data_len = BB_GetNumBytesWritten(&bw); } break; //- Process command: Heartbeat case N_CmdKind_Heartbeat: { u8 packet_flags = 0; N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BB_BuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ BB_WriteIBits(&bw, N_PacketKind_Heartbeat, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); BB_WriteUBits(&bw, cmd->heartbeat_id, 16); BB_WriteUBits(&bw, cmd->heartbeat_ack_id, 16); packet->data_len = BB_GetNumBytesWritten(&bw); } break; //- Process command: Write case N_CmdKind_Write: { b32 is_reliable = cmd->write_reliable; u8 packet_flags = (is_reliable * N_PacketFlag_Reliable); String msg = cmd->write_msg; u64 chunk_count = 0; if (msg.len > 0) { chunk_count = (msg.len - 1) / N_MaxPacketChunkLen; } chunk_count += 1; u64 msg_id = ++channel->last_sent_msg_id; for (u64 i = 0; i < chunk_count; ++i) { u64 chunk_len = N_MaxPacketChunkLen; b32 is_last_chunk = i + 1 == chunk_count; if (is_last_chunk) { chunk_len = msg.len % N_MaxPacketChunkLen; if (chunk_len == 0) { chunk_len = N_MaxPacketChunkLen; } } N_SndPacket *packet = N_PushSndPacket(channel, is_reliable); BB_Buff bb = BB_BuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ BB_WriteIBits(&bw, N_PacketKind_MsgChunk, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); if (is_reliable) { BB_WriteUV(&bw, packet->seq); } BB_WriteUV(&bw, msg_id); BB_WriteUV(&bw, i); BB_WriteUV(&bw, chunk_count); if (is_last_chunk) { BB_WriteUV(&bw, chunk_len); } u8 *chunk_data = msg.text + (i * N_MaxPacketChunkLen); BB_WriteBytes(&bw, STRING(chunk_len, chunk_data)); packet->data_len = BB_GetNumBytesWritten(&bw); } } break; } } } } //- Send packets /* TODO: Aggregate small packets */ { __profn("Send host packets"); for (u64 i = 0; i < host->num_channels_reserved; ++i) { P_Sock *sock = host->sock; N_Channel *channel = &host->channels[i]; u64 total_sent = 0; if (channel->valid) { P_Address address = channel->address; /* Send reliable packets to channel */ for (N_SndPacket *packet = channel->first_reliable_packet; packet; packet = packet->next) { P_WriteSock(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Send unreliable packets to channel */ for (N_SndPacket *packet = channel->first_unreliable_packet; packet; packet = packet->next) { P_WriteSock(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Release unreliable packets */ if (channel->first_unreliable_packet) { channel->last_unreliable_packet->next = host->first_free_packet; host->first_free_packet = channel->first_unreliable_packet; channel->first_unreliable_packet = 0; channel->last_unreliable_packet = 0; channel->num_unreliable_packets = 0; } host->bytes_sent += total_sent; } } } //- Reset commands list host->first_cmd = 0; host->last_cmd = 0; ResetArena(host->cmd_arena); EndScratch(scratch); } //////////////////////////////////////////////////////////// //~ PushEvent N_Event *N_PushEvent(Arena *arena, N_EventList *list) { N_Event *event = PushStruct(arena, N_Event); if (list->last) { list->last->next = event; } else { list->first = event; } list->last = event; return event; }