From 634c4c6a02bc63c47317c048c4f7a7f02b14a9cd Mon Sep 17 00:00:00 2001 From: jacob Date: Wed, 30 Jul 2025 19:59:36 -0500 Subject: [PATCH] net layer refactor --- src/app/app_core.c | 21 +- src/gp/gp_core.h | 2 +- src/mixer/mixer_core.h | 4 +- src/mp3/mp3_core.h | 4 +- src/net/net_core.c | 823 +++++++++++++++++++++-------------------- src/net/net_core.h | 319 ++++++++++++---- src/sim/sim_core.c | 6 +- src/user/user_core.c | 24 +- src/user/user_core.h | 1 - 9 files changed, 692 insertions(+), 512 deletions(-) diff --git a/src/app/app_core.c b/src/app/app_core.c index 60fe1e31..98892718 100644 --- a/src/app/app_core.c +++ b/src/app/app_core.c @@ -238,19 +238,18 @@ void P_AppStartup(String args_str) gp_startup(); /* Subsystems */ - N_StartupReceipt host_sr = host_startup(); - AC_StartupReceipt asset_cache_sr = AC_Startup(); - TTF_StartupReceipt ttf_sr = ttf_startup(); - F_StartupReceipt font_sr = F_Startup(&asset_cache_sr, &ttf_sr); - S_StartupReceipt sprite_sr = sprite_startup(); - M_StartupReceipt mixer_sr = M_Startup(); - SND_StartupReceipt sound_sr = sound_startup(&asset_cache_sr); - D_StartupReceipt draw_sr = D_Startup(&font_sr); - SimStartupReceipt sim_sr = sim_startup(); + AC_StartupReceipt asset_cache_sr = AC_Startup(); + TTF_StartupReceipt ttf_sr = ttf_startup(); + F_StartupReceipt font_sr = F_Startup(&asset_cache_sr, &ttf_sr); + S_StartupReceipt sprite_sr = sprite_startup(); + M_StartupReceipt mixer_sr = M_Startup(); + SND_StartupReceipt sound_sr = sound_startup(&asset_cache_sr); + D_StartupReceipt draw_sr = D_Startup(&font_sr); + SimStartupReceipt sim_sr = sim_startup(); /* Interface systems */ - PB_StartupReceipt playback_sr = playback_startup(&mixer_sr); - struct user_startup_receipt user_sr = user_startup(&font_sr, &sprite_sr, &draw_sr, &asset_cache_sr, &sound_sr, &mixer_sr, &host_sr, &sim_sr, connect_address); + PB_StartupReceipt playback_sr = playback_startup(&mixer_sr); + struct user_startup_receipt user_sr = user_startup(&font_sr, &sprite_sr, &draw_sr, &asset_cache_sr, &sound_sr, &mixer_sr, &sim_sr, connect_address); (UNUSED)user_sr; (UNUSED)playback_sr; diff --git a/src/gp/gp_core.h b/src/gp/gp_core.h index a6a9b253..1b035a69 100644 --- a/src/gp/gp_core.h +++ b/src/gp/gp_core.h @@ -92,7 +92,7 @@ typedef i32 G_TextureFormat; enum typedef i32 G_TextureFlag; enum { - GP_TEXTURE_FLAG_NONE = (0), + GP_TEXTURE_FLAG_NONE = 0, GP_TEXTURE_FLAG_TARGETABLE = (1 << 0) }; diff --git a/src/mixer/mixer_core.h b/src/mixer/mixer_core.h index 3c4c9768..d14c987b 100644 --- a/src/mixer/mixer_core.h +++ b/src/mixer/mixer_core.h @@ -3,8 +3,8 @@ typedef u32 M_TrackFlag; enum { - M_TrackFlag_None = (0), - M_TrackFlag_Spatialize = (1 << 0) + M_TrackFlag_None = 0, + M_TrackFlag_Spatialize = (1 << 0) }; Struct(M_Handle) diff --git a/src/mp3/mp3_core.h b/src/mp3/mp3_core.h index 0e3e0a1a..1a094d35 100644 --- a/src/mp3/mp3_core.h +++ b/src/mp3/mp3_core.h @@ -1,9 +1,9 @@ //////////////////////////////// -//~ Mp3 structs +//~ Mp3 types typedef u32 MP3_DecodeFlag; enum { - MP3_DecodeFlag_None = (0), + MP3_DecodeFlag_None = 0, MP3_DecodeFlag_Stereo = (1 << 0), }; diff --git a/src/net/net_core.c b/src/net/net_core.c index 29ba85d0..227995ae 100644 --- a/src/net/net_core.c +++ b/src/net/net_core.c @@ -12,132 +12,10 @@ * Challenges to verify receiving address. */ -enum host_packet_kind { - HOST_PACKET_KIND_NONE, - HOST_PACKET_KIND_TRY_CONNECT, - HOST_PACKET_KIND_CONNECT_SUCCESS, - HOST_PACKET_KIND_DISCONNECT, - HOST_PACKET_KIND_HEARTBEAT, - HOST_PACKET_KIND_MSG_CHUNK -}; +//////////////////////////////// +//~ Host -enum host_packet_flag { - HOST_PACKET_FLAG_NONE = 0, - HOST_PACKET_FLAG_RELIABLE = (1 << 0) -}; - -struct host_channel { - N_ChannelId id; - b32 valid; - b32 connected; - N_Host *host; - - struct host_channel *next_free; - - P_Address address; - u64 address_hash; - struct host_channel *next_address_hash; - struct host_channel *prev_address_hash; - - /* NOTE: Packets are allocated in host's `arena` */ - N_SndPacket *first_reliable_packet; - N_SndPacket *last_reliable_packet; - N_SndPacket *first_unreliable_packet; - N_SndPacket *last_unreliable_packet; - u64 num_reliable_packets; - u64 num_unreliable_packets; - - /* NOTE: Msg assemblers are allocated in host's `arena` */ - struct host_msg_assembler *least_recent_msg_assembler; - struct host_msg_assembler *most_recent_msg_assembler; - - u16 last_heartbeat_received_id; - u16 last_heartbeat_acked_id; - i64 last_heartbeat_acked_ns; - i64 last_heartbeat_rtt_ns; - - u64 last_sent_msg_id; - u64 their_acked_seq; - u64 our_acked_seq; - u64 last_sent_seq; - - i64 last_packet_received_ns; -}; - -struct host_channel_node { - struct host_channel *channel; - struct host_channel_node *next; -}; - -struct host_channel_list { - struct host_channel_node *first; - struct host_channel_node *last; -}; - -struct host_rcv_packet { - struct sock *sock; - P_Address address; - String data; - struct host_rcv_packet *next; -}; - -struct host_msg_assembler { - struct host_channel *channel; - b32 is_reliable; - - /* Free list */ - struct host_msg_assembler *next_free; - - /* Bucket list */ - struct host_msg_assembler *next_hash; - struct host_msg_assembler *prev_hash; - - /* Channel list */ - struct host_msg_assembler *less_recent; - struct host_msg_assembler *more_recent; - - u64 msg_id; - u64 hash; - - u64 last_chunk_len; - u64 num_chunks_total; - u64 num_chunks_received; - - i64 touched_ns; - - BuddyBlock *buddy_block; - u8 *chunk_bitmap; - u8 *chunk_data; -}; - -struct host_msg_assembler_lookup_bin { - struct host_msg_assembler *first; - struct host_msg_assembler *last; -}; - -Readonly Global struct host_channel _g_host_channel_nil = { .valid = 0 }; - -Global struct { - i32 _; -} G = ZI, DebugAlias(G, G_host); - -internal void host_msg_assembler_release(struct host_msg_assembler *ma); - -/* ========================== * - * Startup - * ========================== */ - -N_StartupReceipt host_startup(void) -{ - __prof; - return (N_StartupReceipt) { 0 }; -} - -/* ========================== * - * Host - * ========================== */ - -N_Host *host_alloc(u16 listen_port) +N_Host *N_AllocHost(u16 listen_port) { Arena *arena = AllocArena(Gibi(64)); N_Host *host = PushStruct(arena, N_Host); @@ -151,20 +29,20 @@ N_Host *host_alloc(u16 listen_port) host->rcv_buffer_write->arena = AllocArena(Gibi(64)); host->buddy = AllocBuddyCtx(Gibi(64)); - host->channels = PushDry(host->channel_arena, struct host_channel); + host->channels = PushDry(host->channel_arena, N_Channel); host->num_channel_lookup_bins = N_NumChannelLookupBins; host->channel_lookup_bins = PushStructs(host->arena, N_ChannelLookupBin, host->num_channel_lookup_bins); host->num_msg_assembler_lookup_bins = N_NumMsgAssemblerLookupBins; - host->msg_assembler_lookup_bins = PushStructs(host->arena, struct host_msg_assembler_lookup_bin, host->num_msg_assembler_lookup_bins); + host->msg_assembler_lookup_bins = PushStructs(host->arena, N_MsgAssemblerLookupBin, host->num_msg_assembler_lookup_bins); host->sock = P_AllocSock(listen_port, Mebi(2), Mebi(2)); return host; } -void host_release(N_Host *host) +void N_ReleaseHost(N_Host *host) { P_ReleaseSock(host->sock); @@ -176,60 +54,72 @@ void host_release(N_Host *host) ReleaseArena(host->arena); } -/* ========================== * - * Channel - * ========================== */ +//////////////////////////////// +//~ Channel -internal u64 hash_from_address(P_Address address) +u64 N_HashFromAddress(P_Address address) { return HashFnv64(Fnv64Basis, StringFromStruct(&address)); } -internal struct host_channel *host_channel_from_address(N_Host *host, P_Address address) +N_Channel *N_ChannelFromAddress(N_Host *host, P_Address address) { - u64 hash = hash_from_address(address); + u64 hash = N_HashFromAddress(address); N_ChannelLookupBin *bin = &host->channel_lookup_bins[hash % host->num_channel_lookup_bins]; - for (struct host_channel *channel = bin->first; channel; channel = channel->next_address_hash) { - if (channel->address_hash == hash && P_AddressIsEqual(channel->address, address)) { + for (N_Channel *channel = bin->first; channel; channel = channel->next_address_hash) + { + if (channel->address_hash == hash && P_AddressIsEqual(channel->address, address)) + { return channel; } } - return &_g_host_channel_nil; + return &N_nil_channel; } -/* Returns nil channel if id = HOST_CHANNEL_ID_ALL */ -internal struct host_channel *host_single_channel_from_id(N_Host *host, N_ChannelId channel_id) +/* Returns nil channel if id = N_AllChannelsId */ +N_Channel *N_ChannelFromId(N_Host *host, N_ChannelId channel_id) { - if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) { - struct host_channel *channel = &host->channels[channel_id.idx]; - if (channel->id.gen == channel_id.gen) { + if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) + { + N_Channel *channel = &host->channels[channel_id.idx]; + if (channel->id.gen == channel_id.gen) + { return channel; } } - return &_g_host_channel_nil; + return &N_nil_channel; } -internal struct host_channel_list host_channels_from_id(Arena *arena, N_Host *host, N_ChannelId channel_id) +N_ChannelList N_ChannelsFromId(Arena *arena, N_Host *host, N_ChannelId channel_id) { - struct host_channel_list result = ZI; - if (host_channel_id_eq(channel_id, HOST_CHANNEL_ID_ALL)) { - for (u64 i = 0; i < host->num_channels_reserved; ++i) { - struct host_channel *channel = &host->channels[i]; - if (channel->valid) { - struct host_channel_node *n = PushStruct(arena, struct host_channel_node); + N_ChannelList result = ZI; + if (N_EqChannelId(channel_id, N_AllChannelsId)) + { + for (u64 i = 0; i < host->num_channels_reserved; ++i) + { + N_Channel *channel = &host->channels[i]; + if (channel->valid) + { + N_ChannelNode *n = PushStruct(arena, N_ChannelNode); n->channel = channel; - if (result.last) { + if (result.last) + { result.last->next = n; - } else { + } + else + { result.first = n; } result.last = n; } } - } else { - struct host_channel *channel = host_single_channel_from_id(host, channel_id); - if (channel->valid) { - struct host_channel_node *n = PushStruct(arena, struct host_channel_node); + } + else + { + N_Channel *channel = N_ChannelFromId(host, channel_id); + if (channel->valid) + { + N_ChannelNode *n = PushStruct(arena, N_ChannelNode); n->channel = channel; result.first = n; result.last = n; @@ -238,17 +128,20 @@ internal struct host_channel_list host_channels_from_id(Arena *arena, N_Host *ho return result; } -internal struct host_channel *host_channel_alloc(N_Host *host, P_Address address) +N_Channel *N_AllocChannel(N_Host *host, P_Address address) { N_ChannelId id = ZI; - struct host_channel *channel; - if (host->first_free_channel) { + N_Channel *channel; + if (host->first_free_channel) + { channel = host->first_free_channel; host->first_free_channel = channel->next_free; id = channel->id; ++id.gen; - } else { - channel = PushStructNoZero(host->channel_arena, struct host_channel); + } + else + { + channel = PushStructNoZero(host->channel_arena, N_Channel); id.gen = 1; id.idx = host->num_channels_reserved; ++host->num_channels_reserved; @@ -258,15 +151,18 @@ internal struct host_channel *host_channel_alloc(N_Host *host, P_Address address channel->id = id; channel->host = host; channel->address = address; - u64 address_hash = hash_from_address(address); + u64 address_hash = N_HashFromAddress(address); channel->address_hash = address_hash; u64 bin_index = address_hash % host->num_channel_lookup_bins; N_ChannelLookupBin *bin = &host->channel_lookup_bins[bin_index]; - if (bin->last) { + if (bin->last) + { channel->prev_address_hash = bin->last; bin->last->next_address_hash = channel; - } else { + } + else + { bin->first = channel; } bin->last = channel; @@ -274,42 +170,51 @@ internal struct host_channel *host_channel_alloc(N_Host *host, P_Address address return channel; } -internal void host_channel_release(struct host_channel *channel) +void N_ReleaseChannel(N_Channel *channel) { N_Host *host = channel->host; /* Release from lookup table */ { N_ChannelLookupBin *bin = &host->channel_lookup_bins[channel->address_hash % host->num_channel_lookup_bins]; - struct host_channel *prev = channel->prev_address_hash; - struct host_channel *next = channel->next_address_hash; - if (prev) { + N_Channel *prev = channel->prev_address_hash; + N_Channel *next = channel->next_address_hash; + if (prev) + { prev->next_address_hash = next; - } else { + } + else + { bin->first = next; } - if (next) { + if (next) + { next->prev_address_hash = prev; - } else { + } + else + { bin->last = prev; } } /* Release packets */ { - if (channel->first_unreliable_packet) { + if (channel->first_unreliable_packet) + { host->first_free_packet = channel->first_unreliable_packet; channel->last_unreliable_packet->next = host->first_free_packet; } - if (channel->first_reliable_packet) { + if (channel->first_reliable_packet) + { host->first_free_packet = channel->first_reliable_packet; channel->last_reliable_packet->next = host->first_free_packet; } } /* Release msg assemblers */ - for (struct host_msg_assembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) { - host_msg_assembler_release(ma); + for (N_MsgAssembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) + { + N_ReleaseMessageAssembler(ma); } ++channel->id.gen; @@ -318,11 +223,10 @@ internal void host_channel_release(struct host_channel *channel) host->first_free_channel = channel; } -/* ========================== * - * Msg assembler - * ========================== */ +//////////////////////////////// +//~ Message assembler -internal u64 hash_from_channel_msg(N_ChannelId channel_id, u64 msg_id) +u64 N_HashFromMsg(N_ChannelId channel_id, u64 msg_id) { u64 result = Fnv64Basis; result = HashFnv64(result, StringFromStruct(&channel_id)); @@ -330,27 +234,32 @@ internal u64 hash_from_channel_msg(N_ChannelId channel_id, u64 msg_id) return result; } -internal struct host_msg_assembler *host_get_msg_assembler(N_Host *host, N_ChannelId channel_id, u64 msg_id) +N_MsgAssembler *N_MsgAssemblerFromMsg(N_Host *host, N_ChannelId channel_id, u64 msg_id) { - u64 hash = hash_from_channel_msg(channel_id, msg_id); - struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; - for (struct host_msg_assembler *ma = bin->first; ma; ma = ma->next_hash) { - if (ma->hash == hash && host_channel_id_eq(ma->channel->id, channel_id) && ma->msg_id == msg_id) { + u64 hash = N_HashFromMsg(channel_id, msg_id); + N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; + for (N_MsgAssembler *ma = bin->first; ma; ma = ma->next_hash) + { + if (ma->hash == hash && N_EqChannelId(ma->channel->id, channel_id) && ma->msg_id == msg_id) + { return ma; } } return 0; } -internal struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable) +N_MsgAssembler *N_AllocMsgAssembler(N_Channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable) { N_Host *host = channel->host; - struct host_msg_assembler *ma; - if (host->first_free_msg_assembler) { + N_MsgAssembler *ma; + if (host->first_free_msg_assembler) + { ma = host->first_free_msg_assembler; host->first_free_msg_assembler = ma->next_free; - } else { - ma = PushStructNoZero(host->arena, struct host_msg_assembler); + } + else + { + ma = PushStructNoZero(host->arena, N_MsgAssembler); } ZeroStruct(ma); ma->channel = channel; @@ -359,7 +268,8 @@ internal struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel ma->num_chunks_total = chunk_count; u64 chunk_bitmap_size = (chunk_count + 7) >> 3; - if ((chunk_bitmap_size % 16) != 0) { + if ((chunk_bitmap_size % 16) != 0) + { /* Align chunk bitmap to 16 so msg data is aligned */ chunk_bitmap_size += 16 - (chunk_bitmap_size % 16); } @@ -378,22 +288,28 @@ internal struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel /* Insert into channel list */ ma->touched_ns = now_ns; - if (channel->most_recent_msg_assembler) { + if (channel->most_recent_msg_assembler) + { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; - } else { + } + else + { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; /* Insert into lookup table */ - u64 hash = hash_from_channel_msg(channel->id, msg_id); + u64 hash = N_HashFromMsg(channel->id, msg_id); ma->hash = hash; - struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; - if (bin->last) { + N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; + if (bin->last) + { bin->last->next_hash = ma; ma->prev_hash = bin->last; - } else { + } + else + { bin->first = ma; } bin->last = ma; @@ -401,41 +317,53 @@ internal struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel return ma; } -internal void host_msg_assembler_release(struct host_msg_assembler *ma) +void N_ReleaseMessageAssembler(N_MsgAssembler *ma) { - struct host_channel *channel = ma->channel; + N_Channel *channel = ma->channel; N_Host *host = channel->host; ReleaseBuddyBlock(ma->buddy_block); /* Release from channel list */ { - struct host_msg_assembler *prev = ma->less_recent; - struct host_msg_assembler *next = ma->more_recent; - if (prev) { + N_MsgAssembler *prev = ma->less_recent; + N_MsgAssembler *next = ma->more_recent; + if (prev) + { prev->more_recent = next; - } else { + } + else + { channel->least_recent_msg_assembler = next; } - if (next) { + if (next) + { next->less_recent = prev; - } else { + } + else + { channel->most_recent_msg_assembler = prev; } } /* Release from lookup table */ - struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[ma->hash % host->num_msg_assembler_lookup_bins]; + N_MsgAssemblerLookupBin *bin = &host->msg_assembler_lookup_bins[ma->hash % host->num_msg_assembler_lookup_bins]; { - struct host_msg_assembler *prev = ma->prev_hash; - struct host_msg_assembler *next = ma->next_hash; - if (prev) { + N_MsgAssembler *prev = ma->prev_hash; + N_MsgAssembler *next = ma->next_hash; + if (prev) + { prev->next_hash = next; - } else { + } + else + { bin->first = next; } - if (next) { + if (next) + { next->prev_hash = prev; - } else { + } + else + { bin->last = prev; } } @@ -444,32 +372,42 @@ internal void host_msg_assembler_release(struct host_msg_assembler *ma) host->first_free_msg_assembler = ma; } -internal void host_msg_assembler_touch(struct host_msg_assembler *ma, i64 now_ns) +void N_TouchMessageAssembler(N_MsgAssembler *ma, i64 now_ns) { - struct host_channel *channel = ma->channel; - if (ma != channel->most_recent_msg_assembler) { + N_Channel *channel = ma->channel; + if (ma != channel->most_recent_msg_assembler) + { /* Remove from channel list */ { - struct host_msg_assembler *prev = ma->less_recent; - struct host_msg_assembler *next = ma->more_recent; - if (prev) { + N_MsgAssembler *prev = ma->less_recent; + N_MsgAssembler *next = ma->more_recent; + if (prev) + { prev->more_recent = next; - } else { + } + else + { channel->least_recent_msg_assembler = next; } - if (next) { + if (next) + { next->less_recent = prev; - } else { + } + else + { channel->most_recent_msg_assembler = prev; } } /* Insert at end of channel list */ { - if (channel->most_recent_msg_assembler) { + if (channel->most_recent_msg_assembler) + { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; - } else { + } + else + { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; @@ -478,50 +416,63 @@ internal void host_msg_assembler_touch(struct host_msg_assembler *ma, i64 now_ns ma->touched_ns = now_ns; } -internal b32 host_msg_assembler_is_chunk_filled(struct host_msg_assembler *ma, u64 chunk_id) +b32 N_IsChunkFilled(N_MsgAssembler *ma, u64 chunk_id) { - if (chunk_id < ma->num_chunks_total) { + if (chunk_id < ma->num_chunks_total) + { return (ma->chunk_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0; } return 0; } -internal void host_msg_assembler_set_chunk_received(struct host_msg_assembler *ma, u64 chunk_id) +void N_MarkChunkReceived(N_MsgAssembler *ma, u64 chunk_id) { - if (chunk_id < ma->num_chunks_total) { + if (chunk_id < ma->num_chunks_total) + { ma->chunk_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8)); } } -/* ========================== * - * Packet - * ========================== */ +//////////////////////////////// +//~ Packet -internal N_SndPacket *host_channel_snd_packet_alloc(struct host_channel *channel, b32 is_reliable) +N_SndPacket *N_PushSndPacket(N_Channel *channel, b32 is_reliable) { N_Host *host = channel->host; N_SndPacket *packet = 0; - if (host->first_free_packet) { + if (host->first_free_packet) + { packet = host->first_free_packet; host->first_free_packet = packet->next; - } else { + } + else + { packet = PushStructNoZero(host->arena, N_SndPacket); } ZeroStruct(packet); - if (is_reliable) { - if (channel->last_reliable_packet) { + if (is_reliable) + { + if (channel->last_reliable_packet) + { channel->last_reliable_packet->next = packet; - } else { + } + else + { channel->first_reliable_packet = packet; } channel->last_reliable_packet = packet; ++channel->num_reliable_packets; packet->seq = ++channel->last_sent_seq; - } else { - if (channel->last_unreliable_packet) { + } + else + { + if (channel->last_unreliable_packet) + { channel->last_unreliable_packet->next = packet; - } else { + } + else + { channel->first_unreliable_packet = packet; } channel->last_unreliable_packet = packet; @@ -530,74 +481,63 @@ internal N_SndPacket *host_channel_snd_packet_alloc(struct host_channel *channel return packet; } -/* ========================== * - * Cmd interface - * ========================== */ +//////////////////////////////// +//~ Host commands -internal N_Cmd *host_cmd_alloc_and_append(N_Host *host) +N_Cmd *N_PushCmd(N_Host *host) { N_Cmd *cmd = PushStruct(host->cmd_arena, N_Cmd); - if (host->last_cmd) { + if (host->last_cmd) + { host->last_cmd->next = cmd; - } else { + } + else + { host->first_cmd = cmd; } host->last_cmd = cmd; return cmd; } -void host_queue_connect_to_address(N_Host *host, P_Address connect_address) +void N_Connect(N_Host *host, P_Address connect_address) { - struct host_channel *channel = host_channel_from_address(host, connect_address); - if (!channel->valid) { - channel = host_channel_alloc(host, connect_address); + N_Channel *channel = N_ChannelFromAddress(host, connect_address); + if (!channel->valid) + { + channel = N_AllocChannel(host, connect_address); } } -void host_queue_disconnect(N_Host *host, N_ChannelId channel_id) +void N_Disconnect(N_Host *host, N_ChannelId channel_id) { - N_Cmd *cmd = host_cmd_alloc_and_append(host); - cmd->kind = HOST_CMD_KIND_DISCONNECT; + N_Cmd *cmd = N_PushCmd(host); + cmd->kind = N_CmdKind_Disconnect; cmd->channel_id = channel_id; } -void host_queue_write(N_Host *host, N_ChannelId channel_id, String msg, u32 flags) +void N_Write(N_Host *host, N_ChannelId channel_id, String msg, N_WriteFlag flags) { - N_Cmd *cmd = host_cmd_alloc_and_append(host); - cmd->kind = HOST_CMD_KIND_WRITE; + N_Cmd *cmd = N_PushCmd(host); + cmd->kind = N_CmdKind_Write; cmd->channel_id = channel_id; cmd->write_msg = CopyString(host->cmd_arena, msg); - cmd->write_reliable = flags & HOST_WRITE_FLAG_RELIABLE; + cmd->write_reliable = flags & N_WriteFlag_Reliable; } -/* ========================== * - * Info - * ========================== */ +//////////////////////////////// +//~ Channel info -i64 host_get_channel_last_rtt_ns(N_Host *host, N_ChannelId channel_id) +i64 N_GetChannelLastRttNs(N_Host *host, N_ChannelId channel_id) { - struct host_channel *channel = host_single_channel_from_id(host, channel_id); + N_Channel *channel = N_ChannelFromId(host, channel_id); return channel->last_heartbeat_rtt_ns; } -/* ========================== * - * Update - * ========================== */ - -internal N_Event *push_event(Arena *arena, N_EventList *list) -{ - N_Event *event = PushStruct(arena, N_Event); - if (list->last) { - list->last->next = event; - } else { - list->first = event; - } - list->last = event; - return event; -} +//////////////////////////////// +//~ Update begin /* Read incoming packets, update channels, and return events */ -N_EventList host_update_begin(Arena *arena, N_Host *host) +N_EventList N_BeginUpdate(Arena *arena, N_Host *host) { TempArena scratch = BeginScratch(arena); @@ -606,23 +546,28 @@ N_EventList host_update_begin(Arena *arena, N_Host *host) { __profn("Read packets"); - /* Read socket */ - struct host_rcv_packet *first_packet = 0; - struct host_rcv_packet *last_packet = 0; + //- Read socket + N_RcvPacket *first_packet = 0; + N_RcvPacket *last_packet = 0; { __profn("Read socket"); P_Sock *sock = host->sock; P_SockReadResult result = ZI; - while ((result = P_ReadSock(scratch.arena, sock)).valid) { + while ((result = P_ReadSock(scratch.arena, sock)).valid) + { P_Address address = result.address; String data = result.data; - if (data.len > 0) { - struct host_rcv_packet *packet = PushStruct(scratch.arena, struct host_rcv_packet); + if (data.len > 0) + { + N_RcvPacket *packet = PushStruct(scratch.arena, N_RcvPacket); packet->address = address; packet->data = CopyString(scratch.arena, data); - if (last_packet) { + if (last_packet) + { last_packet->next = packet; - } else { + } + else + { first_packet = packet; } last_packet = packet; @@ -630,103 +575,123 @@ N_EventList host_update_begin(Arena *arena, N_Host *host) } } - /* Read incoming packets */ + //- Read incoming packets { __profn("Process host packets"); - for (struct host_rcv_packet *packet = first_packet; packet; packet = packet->next) { + for (N_RcvPacket *packet = first_packet; packet; packet = packet->next) + { //struct sock *sock = packet->sock; P_Address address = packet->address; BB_Buff bb = BitbuffFromString(packet->data); BB_Reader br = BB_ReaderFromBuff(&bb); u32 magic = BB_ReadUBits(&br, 32); /* TODO: implicitly encode magic into crc32 */ - if (magic == N_PacketMagic) { + if (magic == N_PacketMagic) + { /* TODO: Combine kind byte with flags byte */ - struct host_channel *channel = host_channel_from_address(host, address); - enum host_packet_kind host_packet_kind = BB_ReadIBits(&br, 8); + N_Channel *channel = N_ChannelFromAddress(host, address); + N_PacketKind packet_kind = BB_ReadIBits(&br, 8); u8 packet_flags = BB_ReadUBits(&br, 8); u64 their_acked_seq = BB_ReadUV(&br); - if (channel->valid) { + if (channel->valid) + { channel->last_packet_received_ns = now_ns; - if (their_acked_seq > channel->their_acked_seq) { + if (their_acked_seq > channel->their_acked_seq) + { channel->their_acked_seq = their_acked_seq; } } b32 skip_packet = 0; - b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE; - if (channel->valid) { - if (is_reliable) { + b32 is_reliable = packet_flags & N_PacketFlag_Reliable; + if (channel->valid) + { + if (is_reliable) + { u64 packet_seq = BB_ReadUV(&br); - if (packet_seq == channel->our_acked_seq + 1) { + if (packet_seq == channel->our_acked_seq + 1) + { channel->our_acked_seq = packet_seq; - } else { + } + else + { skip_packet = 1; } } } - if (!skip_packet) { - switch (host_packet_kind) { - case HOST_PACKET_KIND_TRY_CONNECT: + if (!skip_packet) + { + switch (packet_kind) + { + //- Read packet kind: TryConnect + case N_PacketKind_TryConnect: { /* A foreign host is trying to connect to us */ - if (!channel->valid) { + if (!channel->valid) + { P_LogInfoF("Host received conection attempt from %F", FmtString(P_StringFromAddress(scratch.arena, address))); /* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */ - channel = host_channel_alloc(host, address); + channel = N_AllocChannel(host, address); } - N_Cmd *cmd = host_cmd_alloc_and_append(host); - cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS; + N_Cmd *cmd = N_PushCmd(host); + cmd->kind = N_CmdKind_ConnectSuccess; cmd->channel_id = channel->id; } break; - - case HOST_PACKET_KIND_CONNECT_SUCCESS: + //- Read packet kind: ConnectSuccess + case N_PacketKind_ConnectSuccess: { /* We successfully connected to a foreign host and they are ready to receive messages */ - if (channel->valid && !channel->connected) { + if (channel->valid && !channel->connected) + { P_LogInfoF("Host received connection from %F", FmtString(P_StringFromAddress(scratch.arena, address))); - N_Event *event = push_event(arena, &events); - event->kind = HOST_EVENT_KIND_CHANNEL_OPENED; + N_Event *event = N_PushEvent(arena, &events); + event->kind = N_EventKind_ChannelOpened; event->channel_id = channel->id; channel->connected = 1; } } break; - - case HOST_PACKET_KIND_DISCONNECT: + //- Read packet kind: Disconnect + case N_PacketKind_Disconnect: { /* A foreign host disconnected from us */ - if (channel->valid) { + if (channel->valid) + { P_LogInfoF("Host received disconnection from %F", FmtString(P_StringFromAddress(scratch.arena, address))); - N_Event *event = push_event(arena, &events); - event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED; + N_Event *event = N_PushEvent(arena, &events); + event->kind = N_EventKind_ChannelClosed; event->channel_id = channel->id; - host_channel_release(channel); + N_ReleaseChannel(channel); } } break; - - case HOST_PACKET_KIND_HEARTBEAT: + //- Read packet kind: Heartbeat + case N_PacketKind_Heartbeat: { - if (channel->valid) { + if (channel->valid) + { u16 heartbeat_id = BB_ReadUBits(&br, 16); u16 acked_heartbeat_id = BB_ReadUBits(&br, 16); - if (heartbeat_id > channel->last_heartbeat_received_id) { + if (heartbeat_id > channel->last_heartbeat_received_id) + { channel->last_heartbeat_received_id = heartbeat_id; } - if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) { + if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) + { channel->last_heartbeat_acked_id = acked_heartbeat_id; - if (channel->last_heartbeat_acked_ns > 0) { + if (channel->last_heartbeat_acked_ns > 0) + { channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns; } channel->last_heartbeat_acked_ns = now_ns; } } } break; - - case HOST_PACKET_KIND_MSG_CHUNK: + //- Read packet kind: MsgChunk + case N_PacketKind_MsgChunk: { - if (channel->valid && channel->connected) { + if (channel->valid && channel->connected) + { /* Packet is chunk out of belonging to message */ u64 msg_id = BB_ReadUV(&br); u64 chunk_id = BB_ReadUV(&br); @@ -734,45 +699,56 @@ N_EventList host_update_begin(Arena *arena, N_Host *host) b32 is_last_chunk = (chunk_id + 1) == chunk_count; u64 chunk_len = is_last_chunk ? BB_ReadUV(&br) : N_MaxPacketChunkLen; - struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id); - if (!ma) { - ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable); + N_MsgAssembler *ma = N_MsgAssemblerFromMsg(host, channel->id, msg_id); + if (!ma) + { + ma = N_AllocMsgAssembler(channel, msg_id, chunk_count, now_ns, is_reliable); } - if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) { - if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) { + if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) + { + if (!N_IsChunkFilled(ma, chunk_id)) + { u8 *src = BB_ReadBytesRaw(&br, chunk_len); - if (src) { + if (src) + { u8 *dst = &ma->chunk_data[chunk_id * N_MaxPacketChunkLen]; CopyBytes(dst, src, chunk_len); - if (is_last_chunk) { + if (is_last_chunk) + { ma->last_chunk_len = chunk_len; } - host_msg_assembler_set_chunk_received(ma, chunk_id); + N_MarkChunkReceived(ma, chunk_id); ++ma->num_chunks_received; - host_msg_assembler_touch(ma, now_ns); - if (ma->num_chunks_received == chunk_count) { + N_TouchMessageAssembler(ma, now_ns); + if (ma->num_chunks_received == chunk_count) + { /* All chunks filled, message has finished assembling */ /* TODO: Message ordering */ - N_Event *event = push_event(arena, &events); + N_Event *event = N_PushEvent(arena, &events); String data = ZI; data.len = ((chunk_count - 1) * N_MaxPacketChunkLen) + ma->last_chunk_len; data.text = PushStructsNoZero(arena, u8, data.len); CopyBytes(data.text, ma->chunk_data, data.len); - event->kind = HOST_EVENT_KIND_MSG; + event->kind = N_EventKind_Msg; event->msg = data; event->channel_id = channel->id; - if (is_reliable) { + if (is_reliable) + { /* Release assembler if reliable */ - host_msg_assembler_release(ma); + N_ReleaseMessageAssembler(ma); } } - } else { + } + else + { /* Overflow reading chunk */ Assert(0); } } - } else { + } + else + { /* Chunk id/count mismatch */ Assert(0); } @@ -788,23 +764,26 @@ N_EventList host_update_begin(Arena *arena, N_Host *host) } } - /* Update channels */ + //- Update channels { __profn("Update host channels"); - for (u64 i = 0; i < host->num_channels_reserved; ++i) { - struct host_channel *channel = &host->channels[i]; - if (channel->valid) { + for (u64 i = 0; i < host->num_channels_reserved; ++i) + { + N_Channel *channel = &host->channels[i]; + if (channel->valid) + { /* Send / resend handshake if not connected */ - if (!channel->connected) { - N_Cmd *cmd = host_cmd_alloc_and_append(host); - cmd->kind = HOST_CMD_KIND_TRY_CONNECT; + if (!channel->connected) + { + N_Cmd *cmd = N_PushCmd(host); + cmd->kind = N_CmdKind_TryConnect; cmd->channel_id = channel->id; } /* Send heartbeat */ /* TODO: Send this less frequently (once per second or half of timeout or something) */ { - N_Cmd *cmd = host_cmd_alloc_and_append(host); - cmd->kind = HOST_CMD_KIND_HEARTBEAT; + N_Cmd *cmd = N_PushCmd(host); + cmd->kind = N_CmdKind_Heartbeat; cmd->heartbeat_id = channel->last_heartbeat_acked_id + 1; cmd->heartbeat_ack_id = channel->last_heartbeat_received_id; cmd->channel_id = channel->id; @@ -813,20 +792,25 @@ N_EventList host_update_begin(Arena *arena, N_Host *host) { u64 acked_seq = channel->their_acked_seq; N_SndPacket *packet = channel->first_reliable_packet; - while (packet) { + while (packet) + { N_SndPacket *next = packet->next; u64 seq = packet->seq; - if (seq < acked_seq) { + if (seq < acked_seq) + { packet->next = host->first_free_packet; host->first_free_packet = packet; channel->first_reliable_packet = next; --channel->num_reliable_packets; - } else { + } + else + { break; } packet = next; } - if (channel->first_reliable_packet == 0) { + if (channel->first_reliable_packet == 0) + { channel->last_reliable_packet = 0; } } @@ -834,14 +818,19 @@ N_EventList host_update_begin(Arena *arena, N_Host *host) { /* TODO: Configurable timeout */ i64 unreliable_msg_timeout_ns = NsFromSeconds(0.1); - struct host_msg_assembler *ma = channel->least_recent_msg_assembler; - while (ma) { - struct host_msg_assembler *next = ma->more_recent; - if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns) { - if (!ma->is_reliable) { - host_msg_assembler_release(ma); + N_MsgAssembler *ma = channel->least_recent_msg_assembler; + while (ma) + { + N_MsgAssembler *next = ma->more_recent; + if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns) + { + if (!ma->is_reliable) + { + N_ReleaseMessageAssembler(ma); } - } else { + } + else + { break; } ma = next; @@ -855,8 +844,11 @@ N_EventList host_update_begin(Arena *arena, N_Host *host) return events; } +//////////////////////////////// +//~ Update end + /* Process host cmds & send outgoing packets */ -void host_update_end(N_Host *host) +void N_EndUpdate(N_Host *host) { __prof; TempArena scratch = BeginScratchNoConflict(); @@ -865,103 +857,113 @@ void host_update_end(N_Host *host) /* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */ { __profn("Process host cmds"); - for (N_Cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) { + for (N_Cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) + { N_CmdKind kind = cmd->kind; N_ChannelId channel_id = cmd->channel_id; - struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id); - for (struct host_channel_node *node = channels.first; node; node = node->next) { - struct host_channel *channel = node->channel; - switch (kind) { - case HOST_CMD_KIND_TRY_CONNECT: + N_ChannelList channels = N_ChannelsFromId(scratch.arena, host, channel_id); + for (N_ChannelNode *node = channels.first; node; node = node->next) + { + N_Channel *channel = node->channel; + switch (kind) + { + //- Process command: TryConnect + case N_CmdKind_TryConnect: { u8 packet_flags = 0; - N_SndPacket *packet = host_channel_snd_packet_alloc(channel, 0); + N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BitbuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ - BB_WriteIBits(&bw, HOST_PACKET_KIND_TRY_CONNECT, 8); + BB_WriteIBits(&bw, N_PacketKind_TryConnect, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); packet->data_len = BB_GetNumBytesWritten(&bw); } break; - - case HOST_CMD_KIND_CONNECT_SUCCESS: + //- Process command: ConnectSuccess + case N_CmdKind_ConnectSuccess: { u8 packet_flags = 0; - N_SndPacket *packet = host_channel_snd_packet_alloc(channel, 0); + N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BitbuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ - BB_WriteIBits(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS, 8); + BB_WriteIBits(&bw, N_PacketKind_ConnectSuccess, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); packet->data_len = BB_GetNumBytesWritten(&bw); } break; - - case HOST_CMD_KIND_DISCONNECT: + //- Process command: Disconnect + case N_CmdKind_Disconnect: { u8 packet_flags = 0; - N_SndPacket *packet = host_channel_snd_packet_alloc(channel, 0); + N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BitbuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ - BB_WriteIBits(&bw, HOST_PACKET_KIND_DISCONNECT, 8); + BB_WriteIBits(&bw, N_PacketKind_Disconnect, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); packet->data_len = BB_GetNumBytesWritten(&bw); } break; - - case HOST_CMD_KIND_HEARTBEAT: + //- Process command: Heartbeat + case N_CmdKind_Heartbeat: { u8 packet_flags = 0; - N_SndPacket *packet = host_channel_snd_packet_alloc(channel, 0); + N_SndPacket *packet = N_PushSndPacket(channel, 0); BB_Buff bb = BitbuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ - BB_WriteIBits(&bw, HOST_PACKET_KIND_HEARTBEAT, 8); + BB_WriteIBits(&bw, N_PacketKind_Heartbeat, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); BB_WriteUBits(&bw, cmd->heartbeat_id, 16); BB_WriteUBits(&bw, cmd->heartbeat_ack_id, 16); packet->data_len = BB_GetNumBytesWritten(&bw); } break; - - case HOST_CMD_KIND_WRITE: + //- Process command: Write + case N_CmdKind_Write: { b32 is_reliable = cmd->write_reliable; - u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE); + u8 packet_flags = (is_reliable * N_PacketFlag_Reliable); String msg = cmd->write_msg; u64 chunk_count = 0; - if (msg.len > 0) { + if (msg.len > 0) + { chunk_count = (msg.len - 1) / N_MaxPacketChunkLen; } chunk_count += 1; u64 msg_id = ++channel->last_sent_msg_id; - for (u64 i = 0; i < chunk_count; ++i) { + for (u64 i = 0; i < chunk_count; ++i) + { u64 chunk_len = N_MaxPacketChunkLen; b32 is_last_chunk = i + 1 == chunk_count; - if (is_last_chunk) { + if (is_last_chunk) + { chunk_len = msg.len % N_MaxPacketChunkLen; - if (chunk_len == 0) { + if (chunk_len == 0) + { chunk_len = N_MaxPacketChunkLen; } } - N_SndPacket *packet = host_channel_snd_packet_alloc(channel, is_reliable); + N_SndPacket *packet = N_PushSndPacket(channel, is_reliable); BB_Buff bb = BitbuffFromString(StringFromArray(packet->data)); BB_Writer bw = BB_WriterFromBuff(&bb); BB_WriteUBits(&bw, N_PacketMagic, 32); /* TODO: implicitly encode magic into crc32 */ - BB_WriteIBits(&bw, HOST_PACKET_KIND_MSG_CHUNK, 8); + BB_WriteIBits(&bw, N_PacketKind_MsgChunk, 8); BB_WriteUBits(&bw, packet_flags, 8); BB_WriteUV(&bw, channel->our_acked_seq); - if (is_reliable) { + if (is_reliable) + { BB_WriteUV(&bw, packet->seq); } BB_WriteUV(&bw, msg_id); BB_WriteUV(&bw, i); BB_WriteUV(&bw, chunk_count); - if (is_last_chunk) { + if (is_last_chunk) + { BB_WriteUV(&bw, chunk_len); } u8 *chunk_data = msg.text + (i * N_MaxPacketChunkLen); @@ -976,28 +978,33 @@ void host_update_end(N_Host *host) } } - /* Send packets */ + //- Send packets /* TODO: Aggregate small packets */ { __profn("Send host packets"); - for (u64 i = 0; i < host->num_channels_reserved; ++i) { + for (u64 i = 0; i < host->num_channels_reserved; ++i) + { P_Sock *sock = host->sock; - struct host_channel *channel = &host->channels[i]; + N_Channel *channel = &host->channels[i]; u64 total_sent = 0; - if (channel->valid) { + if (channel->valid) + { P_Address address = channel->address; /* Send reliable packets to channel */ - for (N_SndPacket *packet = channel->first_reliable_packet; packet; packet = packet->next) { + for (N_SndPacket *packet = channel->first_reliable_packet; packet; packet = packet->next) + { P_WriteSock(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Send unreliable packets to channel */ - for (N_SndPacket *packet = channel->first_unreliable_packet; packet; packet = packet->next) { + for (N_SndPacket *packet = channel->first_unreliable_packet; packet; packet = packet->next) + { P_WriteSock(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Release unreliable packets */ - if (channel->first_unreliable_packet) { + if (channel->first_unreliable_packet) + { channel->last_unreliable_packet->next = host->first_free_packet; host->first_free_packet = channel->first_unreliable_packet; channel->first_unreliable_packet = 0; @@ -1010,10 +1017,28 @@ void host_update_end(N_Host *host) } - /* Reset cmds */ + //- Reset commands list host->first_cmd = 0; host->last_cmd = 0; ResetArena(host->cmd_arena); EndScratch(scratch); } + +//////////////////////////////// +//~ PushEvent + +N_Event *N_PushEvent(Arena *arena, N_EventList *list) +{ + N_Event *event = PushStruct(arena, N_Event); + if (list->last) + { + list->last->next = event; + } + else + { + list->first = event; + } + list->last = event; + return event; +} diff --git a/src/net/net_core.h b/src/net/net_core.h index df9dfd32..6e0273eb 100644 --- a/src/net/net_core.h +++ b/src/net/net_core.h @@ -1,43 +1,33 @@ -#define HOST_CHANNEL_ID_NIL (N_ChannelId) { .gen = 0, .idx = 0 } -#define HOST_CHANNEL_ID_ALL (N_ChannelId) { .gen = U32Max, .idx = U32Max } +//////////////////////////////// +//~ Channel ID -#define N_PacketMagic 0xd9e3b8b6 -#define N_MaxPacketChunkLen 1024 -#define N_MaxPacketLen 1280 /* Give enough space for msg chunk + header */ - -#define N_NumChannelLookupBins 512 -#define N_NumMsgAssemblerLookupBins 16384 - -typedef i32 N_CmdKind; enum { - HOST_CMD_KIND_NONE, - - HOST_CMD_KIND_TRY_CONNECT, - HOST_CMD_KIND_CONNECT_SUCCESS, - HOST_CMD_KIND_DISCONNECT, - HOST_CMD_KIND_HEARTBEAT, - HOST_CMD_KIND_WRITE -}; - -typedef i32 N_EventKind; enum { - HOST_EVENT_KIND_NONE, - - HOST_EVENT_KIND_CHANNEL_OPENED, - HOST_EVENT_KIND_CHANNEL_CLOSED, - HOST_EVENT_KIND_MSG -}; - -typedef i32 N_WriteFlag; enum { - HOST_WRITE_FLAG_NONE = 0, - - HOST_WRITE_FLAG_RELIABLE = (1 << 0) -}; - -Struct(N_ChannelId) { +Struct(N_ChannelId) +{ u32 gen; u32 idx; }; -Struct(N_Cmd) { +//////////////////////////////// +//~ Host command types + +typedef i32 N_CmdKind; enum +{ + N_CmdKind_None, + N_CmdKind_TryConnect, + N_CmdKind_ConnectSuccess, + N_CmdKind_Disconnect, + N_CmdKind_Heartbeat, + N_CmdKind_Write +}; + +typedef i32 N_WriteFlag; enum +{ + N_WriteFlag_None = 0, + N_WriteFlag_Reliable = (1 << 0) +}; + +Struct(N_Cmd) +{ N_CmdKind kind; N_ChannelId channel_id; @@ -47,11 +37,22 @@ Struct(N_Cmd) { b32 write_reliable; String write_msg; - N_Cmd *next; }; -Struct(N_Event) { +//////////////////////////////// +//~ Event types + +typedef i32 N_EventKind; enum +{ + N_EventKind_None, + N_EventKind_ChannelOpened, + N_EventKind_ChannelClosed, + N_EventKind_Msg +}; + +Struct(N_Event) +{ N_EventKind kind; N_ChannelId channel_id; String msg; @@ -59,23 +60,43 @@ Struct(N_Event) { N_Event *next; }; -Struct(N_EventList) { +Struct(N_EventList) +{ N_Event *first; N_Event *last; }; -Struct(N_ChannelLookupBin) { - struct host_channel *first; - struct host_channel *last; +Struct(N_ChannelLookupBin) +{ + struct N_Channel *first; + struct N_Channel *last; }; -Struct(N_RcvBuffer) { - Arena *arena; - struct host_rcv_packet *first_packet; - struct host_rcv_packet *last_packet; +//////////////////////////////// +//~ Packet types + +#define N_PacketMagic 0xd9e3b8b6 +#define N_MaxPacketChunkLen 1024 +#define N_MaxPacketLen 1280 /* Give enough space for msg chunk + header */ + +typedef i32 N_PacketKind; enum +{ + N_PacketKind_None, + N_PacketKind_TryConnect, + N_PacketKind_ConnectSuccess, + N_PacketKind_Disconnect, + N_PacketKind_Heartbeat, + N_PacketKind_MsgChunk }; -Struct(N_SndPacket) { +typedef i32 N_PacketFlag; enum +{ + N_PacketFlag_None = 0, + N_PacketFlag_Reliable = (1 << 0) +}; + +Struct(N_SndPacket) +{ N_SndPacket *next; u64 seq; @@ -83,7 +104,122 @@ Struct(N_SndPacket) { u8 data[N_MaxPacketLen]; }; -Struct(N_Host) { +Struct(N_RcvPacket) +{ + P_Sock *sock; + P_Address address; + String data; + N_RcvPacket *next; +}; + +Struct(N_RcvBuffer) +{ + Arena *arena; + N_RcvPacket *first_packet; + N_RcvPacket *last_packet; +}; + +//////////////////////////////// +//~ Channel types + +Struct(N_Channel) +{ + N_ChannelId id; + b32 valid; + b32 connected; + struct N_Host *host; + + N_Channel *next_free; + + P_Address address; + u64 address_hash; + N_Channel *next_address_hash; + N_Channel *prev_address_hash; + + /* NOTE: Packets are allocated in host's `arena` */ + N_SndPacket *first_reliable_packet; + N_SndPacket *last_reliable_packet; + N_SndPacket *first_unreliable_packet; + N_SndPacket *last_unreliable_packet; + u64 num_reliable_packets; + u64 num_unreliable_packets; + + /* NOTE: Msg assemblers are allocated in host's `arena` */ + struct N_MsgAssembler *least_recent_msg_assembler; + struct N_MsgAssembler *most_recent_msg_assembler; + + u16 last_heartbeat_received_id; + u16 last_heartbeat_acked_id; + i64 last_heartbeat_acked_ns; + i64 last_heartbeat_rtt_ns; + + u64 last_sent_msg_id; + u64 their_acked_seq; + u64 our_acked_seq; + u64 last_sent_seq; + + i64 last_packet_received_ns; +}; + +Struct(N_ChannelNode) +{ + N_Channel *channel; + N_ChannelNode *next; +}; + +Struct(N_ChannelList) +{ + N_ChannelNode *first; + N_ChannelNode *last; +}; + +//////////////////////////////// +//~ Message asssembler types + +Struct(N_MsgAssembler) +{ + N_Channel *channel; + b32 is_reliable; + + /* Free list */ + N_MsgAssembler *next_free; + + /* Bucket list */ + N_MsgAssembler *next_hash; + N_MsgAssembler *prev_hash; + + /* Channel list */ + N_MsgAssembler *less_recent; + N_MsgAssembler *more_recent; + + u64 msg_id; + u64 hash; + + u64 last_chunk_len; + u64 num_chunks_total; + u64 num_chunks_received; + + i64 touched_ns; + + BuddyBlock *buddy_block; + u8 *chunk_bitmap; + u8 *chunk_data; +}; + +Struct(N_MsgAssemblerLookupBin) +{ + N_MsgAssembler *first; + N_MsgAssembler *last; +}; + +//////////////////////////////// +//~ Host types + +#define N_NumChannelLookupBins 512 +#define N_NumMsgAssemblerLookupBins 16384 + +Struct(N_Host) +{ Arena *arena; P_Sock *sock; @@ -96,17 +232,17 @@ Struct(N_Host) { N_Cmd *first_free_cmd; Arena *channel_arena; - struct host_channel *channels; - struct host_channel *first_free_channel; + N_Channel *channels; + N_Channel *first_free_channel; u64 num_channels_reserved; - N_SndPacket *first_free_packet; /* Allocated in `arena` */ - struct host_msg_assembler *first_free_msg_assembler; /* Allocated in `arena` */ + N_SndPacket *first_free_packet; /* Allocated in `arena` */ + N_MsgAssembler *first_free_msg_assembler; /* Allocated in `arena` */ - N_ChannelLookupBin *channel_lookup_bins; /* Allocated in `arena` */ + N_ChannelLookupBin *channel_lookup_bins; /* Allocated in `arena` */ u64 num_channel_lookup_bins; - struct host_msg_assembler_lookup_bin *msg_assembler_lookup_bins; /* Allocated in `arena` */ + N_MsgAssemblerLookupBin *msg_assembler_lookup_bins; /* Allocated in `arena` */ u64 num_msg_assembler_lookup_bins; /* Double buffer for incoming data */ @@ -118,42 +254,65 @@ Struct(N_Host) { u64 bytes_sent; }; -/* ========================== * - * Startup - * ========================== */ +//////////////////////////////// +//~ Nil constants -Struct(N_StartupReceipt) { i32 _; }; -N_StartupReceipt host_startup(void); +Readonly Global N_Channel N_nil_channel = { .valid = 0 }; -/* ========================== * - * Host - * ========================== */ +//////////////////////////////// +//~ Host initialization -N_Host *host_alloc(u16 listen_port); +N_Host *N_AllocHost(u16 listen_port); +void N_ReleaseHost(N_Host *host); -void host_release(N_Host *host); +//////////////////////////////// +//~ Channel operations -/* ========================== * - * Queue - * ========================== */ +#define N_NilChannelId (N_ChannelId) { .gen = 0, .idx = 0 } +#define N_AllChannelsId (N_ChannelId) { .gen = U32Max, .idx = U32Max } -void host_queue_connect_to_address(N_Host *host, P_Address connect_address); +#define N_EqChannelId(a, b) ((a).gen == (b).gen && (a).idx == (b).idx) +#define N_IsChannelIdNil(v) ((v).gen == 0 && (v).idx == 0) -void host_queue_disconnect(N_Host *host, N_ChannelId channel_id); +u64 N_HashFromAddress(P_Address address); +N_Channel *N_ChannelFromAddress(N_Host *host, P_Address address); +N_Channel *N_ChannelFromId(N_Host *host, N_ChannelId channel_id); +N_ChannelList N_ChannelsFromId(Arena *arena, N_Host *host, N_ChannelId channel_id); +N_Channel *N_AllocChannel(N_Host *host, P_Address address); +void N_ReleaseChannel(N_Channel *channel); -void host_queue_write(N_Host *host, N_ChannelId channel_id, String msg, u32 flags); +//////////////////////////////// +//~ Message assembler operations -/* ========================== * - * Info - * ========================== */ +u64 N_HashFromMsg(N_ChannelId channel_id, u64 msg_id); +N_MsgAssembler *N_MsgAssemblerFromMsg(N_Host *host, N_ChannelId channel_id, u64 msg_id); +N_MsgAssembler *N_AllocMsgAssembler(N_Channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable); +void N_ReleaseMessageAssembler(N_MsgAssembler *ma); +void N_TouchMessageAssembler(N_MsgAssembler *ma, i64 now_ns); +b32 N_IsChunkFilled(N_MsgAssembler *ma, u64 chunk_id); +void N_MarkChunkReceived(N_MsgAssembler *ma, u64 chunk_id); -i64 host_get_channel_last_rtt_ns(N_Host *host, N_ChannelId channel_id); -Inline b32 host_channel_id_eq(N_ChannelId a, N_ChannelId b) { return a.idx == b.idx && a.gen == b.gen; } -Inline b32 host_channel_id_is_nil(N_ChannelId id) { return id.gen == 0 && id.idx == 0; } +//////////////////////////////// +//~ Packet operations -/* ========================== * - * Update - * ========================== */ +N_SndPacket *N_PushSndPacket(N_Channel *channel, b32 is_reliable); -N_EventList host_update_begin(Arena *arena, N_Host *host); -void host_update_end(N_Host *host); +//////////////////////////////// +//~ Host command operations + +N_Cmd *N_PushCmd(N_Host *host); +void N_Connect(N_Host *host, P_Address connect_address); +void N_Disconnect(N_Host *host, N_ChannelId channel_id); +void N_Write(N_Host *host, N_ChannelId channel_id, String msg, N_WriteFlag flags); + +//////////////////////////////// +//~ Channel info operations + +i64 N_GetChannelLastRttNs(N_Host *host, N_ChannelId channel_id); + +//////////////////////////////// +//~ Update + +N_Event *N_PushEvent(Arena *arena, N_EventList *list); +N_EventList N_BeginUpdate(Arena *arena, N_Host *host); +void N_EndUpdate(N_Host *host); diff --git a/src/sim/sim_core.c b/src/sim/sim_core.c index 6b4691b0..51ed1bf2 100644 --- a/src/sim/sim_core.c +++ b/src/sim/sim_core.c @@ -171,7 +171,7 @@ void sim_client_release(Client *client) } /* Remove from channel lookup */ - sim_client_set_channel_id(client, HOST_CHANNEL_ID_NIL); + sim_client_set_channel_id(client, N_NilChannelId); /* Release client */ ClientStore *store = client->store; @@ -198,7 +198,7 @@ void sim_client_set_channel_id(Client *client, N_ChannelId channel_id) N_ChannelId old_channel_id = client->channel_id; /* Remove old channel id from channel lookup */ - if (!host_channel_id_is_nil(old_channel_id)) { + if (!N_IsChannelIdNil(old_channel_id)) { u64 bin_index = client->channel_hash % store->num_client_lookup_bins; ClientLookupBin *bin = &store->client_lookup_bins[bin_index]; Client *prev = sim_client_from_handle(store, client->prev_in_bin); @@ -220,7 +220,7 @@ void sim_client_set_channel_id(Client *client, N_ChannelId channel_id) u64 channel_hash = hash_from_channel_id(channel_id); client->channel_id = channel_id; client->channel_hash = channel_hash; - if (!host_channel_id_is_nil(channel_id)) { + if (!N_IsChannelIdNil(channel_id)) { u64 bin_index = channel_hash % store->num_client_lookup_bins; ClientLookupBin *bin = &store->client_lookup_bins[bin_index]; { diff --git a/src/user/user_core.c b/src/user/user_core.c index e37639fd..fddae6bb 100644 --- a/src/user/user_core.c +++ b/src/user/user_core.c @@ -184,7 +184,6 @@ struct user_startup_receipt user_startup(F_StartupReceipt *font_sr, AC_StartupReceipt *asset_cache_sr, SND_StartupReceipt *sound_sr, M_StartupReceipt *mixer_sr, - N_StartupReceipt *host_sr, SimStartupReceipt *sim_sr, String connect_address_str) { @@ -195,7 +194,6 @@ struct user_startup_receipt user_startup(F_StartupReceipt *font_sr, (UNUSED)asset_cache_sr; (UNUSED)sound_sr; (UNUSED)mixer_sr; - (UNUSED)host_sr; (UNUSED)sim_sr; SetGstat(GSTAT_DEBUG_STEPS, U64Max); @@ -2186,7 +2184,7 @@ internal P_JobDef(local_sim_job, _) #if 0 struct host_listen_address local_listen_addr = host_listen_address_from_local_name(Lit("LOCAL_SIM")); struct host_listen_address net_listen_addr = host_listen_address_from_net_port(12345); - //N_Host *host = host_alloc(); + //N_Host *host = N_AllocHost(); /* TODO: Host system should allocate & copy string stored in local_listen_addr */ //host_listen(host, local_listen_addr); //host_listen(host, net_listen_addr); @@ -2195,11 +2193,11 @@ internal P_JobDef(local_sim_job, _) b32 is_master = 0; N_Host *host; if (G.connect_address_str.len > 0) { - host = host_alloc(0); + host = N_AllocHost(0); P_Address addr = P_AddressFromString(G.connect_address_str); - host_queue_connect_to_address(host, addr); + N_Connect(host, addr); } else { - host = host_alloc(12345); + host = N_AllocHost(12345); is_master = 1; } @@ -2268,7 +2266,7 @@ internal P_JobDef(local_sim_job, _) real_dt_ns = P_TimeNs() - real_time_ns; real_time_ns += real_dt_ns; - N_EventList host_events = host_update_begin(scratch.arena, host); + N_EventList host_events = N_BeginUpdate(scratch.arena, host); /* Read net messages */ struct sim_decode_queue queue = ZI; @@ -2277,7 +2275,7 @@ internal P_JobDef(local_sim_job, _) N_ChannelId channel_id = event->channel_id; Client *client = sim_client_from_channel_id(store, channel_id); switch (event->kind) { - case HOST_EVENT_KIND_CHANNEL_OPENED: + case N_EventKind_ChannelOpened: { if (!client->valid) { if (is_master) { @@ -2299,7 +2297,7 @@ internal P_JobDef(local_sim_job, _) } } break; - case HOST_EVENT_KIND_MSG: + case N_EventKind_Msg: { if (client->valid) { BB_Buff msg_bb = BitbuffFromString(event->msg); @@ -2442,7 +2440,7 @@ internal P_JobDef(local_sim_job, _) for (u64 i = 0; i < store->num_clients_reserved; ++i) { Client *client = &store->clients[i]; if (client->valid && client != local_client && client != publish_client && client != user_input_client && client != master_client) { - client->last_rtt_ns = host_get_channel_last_rtt_ns(host, client->channel_id); + client->last_rtt_ns = N_GetChannelLastRttNs(host, client->channel_id); /* Release unneeded received snapshots */ /* TDOO: Cap how many client snapshots we're willing to retain */ if (client->double_ack > 0) { @@ -2741,7 +2739,7 @@ internal P_JobDef(local_sim_job, _) String encoded = ZI; encoded.len = BB_GetNumBytesWritten(&msg_bw); encoded.text = BB_GetWrittenRaw(&msg_bw); - host_queue_write(host, client->channel_id, encoded, 0); + N_Write(host, client->channel_id, encoded, 0); } } @@ -2767,7 +2765,7 @@ internal P_JobDef(local_sim_job, _) skip_step: /* Send host messages */ - host_update_end(host); + N_EndUpdate(host); __profframe("Local sim"); EndScratch(scratch); @@ -2778,5 +2776,5 @@ skip_step: sim_accel_release(&accel); ReleaseBitbuff(&snapshot_writer_bb); ReleaseBitbuff(&msg_writer_bb); - host_release(host); + N_ReleaseHost(host); } diff --git a/src/user/user_core.h b/src/user/user_core.h index c8d8f74a..6d3871a6 100644 --- a/src/user/user_core.h +++ b/src/user/user_core.h @@ -54,6 +54,5 @@ struct user_startup_receipt user_startup(F_StartupReceipt *font_sr, AC_StartupReceipt *asset_cache_sr, SND_StartupReceipt *sound_sr, M_StartupReceipt *mixer_sr, - N_StartupReceipt *host_sr, SimStartupReceipt *sim_sr, String connect_address_str);