From 84e64b5cce2cb0e4c3c6d8ca3c5b807847dd4af7 Mon Sep 17 00:00:00 2001 From: jacob Date: Thu, 6 Feb 2025 16:13:28 -0600 Subject: [PATCH] host progress --- src/game.c | 10 +- src/host.c | 513 ++++++++++++++++++++++++++++++++++++++--------------- src/host.h | 28 ++- src/rng.c | 2 +- src/sock.h | 19 +- src/user.c | 10 +- 6 files changed, 418 insertions(+), 164 deletions(-) diff --git a/src/game.c b/src/game.c index e0b096f4..e3b7eb79 100644 --- a/src/game.c +++ b/src/game.c @@ -26,7 +26,7 @@ GLOBAL struct { struct sprite_scope *sprite_frame_scope; - struct host host; + struct host *host; /* For debugging */ struct v2 user_cursor; @@ -531,7 +531,7 @@ INTERNAL void game_update(void) struct game_cmd_list game_cmds = ZI; { - struct host_event_array host_events = host_pop_events(scratch.arena, &G.host); + struct host_event_array host_events = host_pop_events(scratch.arena, G.host); game_cmds_from_host_events(scratch.arena, host_events, &game_cmds); } @@ -573,7 +573,7 @@ INTERNAL void game_update(void) if (client_ent->valid) { entity_disable_prop(client_ent, ENTITY_PROP_PLAYER_CONTROLLED); entity_enable_prop(client_ent, ENTITY_PROP_RELEASE_NEXT_TICK); - host_queue_disconnect(&G.host, channel_id); + host_queue_disconnect(G.host, channel_id); } client_release(client); } @@ -1327,12 +1327,12 @@ INTERNAL void game_update(void) l.last = &snapshot_event; struct string msg = game_string_from_events(temp.arena, l); - host_queue_write(&G.host, HOST_CHANNEL_ID_ALL, msg); + host_queue_write(G.host, HOST_CHANNEL_ID_ALL, msg); arena_temp_end(temp); } - host_update(&G.host); + host_update(G.host); __profframe("Game"); /* ========================== * diff --git a/src/host.c b/src/host.c index 3d42e7f5..8be85e92 100644 --- a/src/host.c +++ b/src/host.c @@ -1,8 +1,9 @@ -#include "host.h" #include "arena.h" +#include "host.h" #include "scratch.h" #include "byteio.h" #include "sys.h" +#include "util.h" //#define HOST_NETWORK_ADDRESS_STRING(str) //#define HOST_NETWORK_ADDRESS_ALL_LOCAL_INTERFACES(port) @@ -11,43 +12,56 @@ #define PACKET_MAGIC 0xd9e3b8b6 #define PACKET_CHUNK_MAX_LEN 256 +#define NUM_CHANNEL_LOOKUP_BUCKETS 512 +#define NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS 16384 + /* Give enough space for msg chunk + header */ #define PACKET_DATA_MAX_LEN (PACKET_CHUNK_MAX_LEN * 2) -enum packet_kind { - PACKET_KIND_NONE, - PACKET_KIND_TRY_CONNECT, - PACKET_KIND_CONNECT_SUCCESS, - PACKET_KIND_DISCONNECT, - PACKET_KIND_MSG_CHUNK +enum host_packet_kind { + HOST_PACKET_KIND_NONE, + HOST_PACKET_KIND_TRY_CONNECT, + HOST_PACKET_KIND_CONNECT_SUCCESS, + HOST_PACKET_KIND_DISCONNECT, + HOST_PACKET_KIND_MSG_CHUNK }; -enum packet_flag { - PACKET_FLAG_NONE = 0, - PACKET_FLAG_RELIABLE = (1 << 0) +enum host_packet_flag { + HOST_PACKET_FLAG_NONE = 0, + HOST_PACKET_FLAG_RELIABLE = (1 << 0) }; -struct packet { +struct host_packet { u64 seq; u8 flags; u64 data_len; u8 data[PACKET_DATA_MAX_LEN]; - struct packet *next; + struct host_packet *next; }; struct host_channel { + struct host_channel_id id; b32 valid; b32 connected; struct host *host; - struct host_channel_id id; - struct sock_address address; + struct host_channel *next_free; - struct packet *first_reliable_packet; - struct packet *last_reliable_packet; - struct packet *first_unreliable_packet; - struct packet *last_unreliable_packet; + struct sock_address address; + u64 address_hash; + struct host_channel *next_address_hash; + struct host_channel *prev_address_hash; + + /* NOTE: Packets are allocated in host's `arena` */ + struct host_packet *first_reliable_packet; + struct host_packet *last_reliable_packet; + struct host_packet *first_unreliable_packet; + struct host_packet *last_unreliable_packet; + + /* NOTE: Msg assemblers are allocated in host's `arena` */ + struct host_msg_assembler *least_recent_msg_assembler; + struct host_msg_assembler *most_recent_msg_assembler; u64 their_acked_seq; u64 our_acked_seq; @@ -63,13 +77,41 @@ struct host_channel_list { struct host_channel_node *last; }; +struct host_channel_lookup_bucket { + struct host_channel *first; + struct host_channel *last; +}; + struct host_queued_event { struct host_event event; struct host_queued_event *next; }; -struct recv_buffer { +struct host_msg_assembler { + struct host *host; + struct host_channel *channel; + + + + /* TODO: Remove this (testing) */ + struct arena testarena; + + + + + /* Free list */ + struct host_msg_assembler *next_free; + + /* Bucket list */ + struct host_msg_assembler *next_hash; + struct host_msg_assembler *prev_hash; + + /* Channel list */ + struct host_msg_assembler *less_recent; + struct host_msg_assembler *more_recent; + u64 msg_id; + u64 hash; u64 last_chunk_len; u64 num_chunks_total; @@ -81,12 +123,19 @@ struct recv_buffer { u8 *data; }; +struct host_msg_assembler_lookup_bucket { + struct host_msg_assembler *first; + struct host_msg_assembler *last; +}; + READONLY GLOBAL struct host_channel _g_host_channel_nil = { .valid = false }; GLOBAL struct { i32 _; } G = ZI, DEBUG_ALIAS(G, G_host); +INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma); + /* ========================== * * Startup * ========================== */ @@ -102,136 +151,315 @@ struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr) * Host * ========================== */ -struct host host_alloc(struct sock_address bind_address) +struct host *host_alloc(struct sock_address bind_address) { - struct host host = ZI; - host.cmd_arena = arena_alloc(GIGABYTE(64)); - host.queued_event_arena = arena_alloc(GIGABYTE(64)); - host.packet_arena = arena_alloc(GIGABYTE(64)); - host.channels_arena = arena_alloc(GIGABYTE(64)); - host.sock = sock_alloc(bind_address, SOCK_FLAG_NON_BLOCKING); + struct arena arena = arena_alloc(GIGABYTE(64)); + struct host *host = arena_push_zero(&arena, struct host); + + host->arena = arena; + host->cmd_arena = arena_alloc(GIGABYTE(64)); + host->queued_event_arena = arena_alloc(GIGABYTE(64)); + host->channel_arena = arena_alloc(GIGABYTE(64)); + + host->channels = arena_dry_push(&host->channel_arena, struct host_channel); + + host->channel_lookup_buckets = arena_push_array_zero(&host->arena, struct host_channel_lookup_bucket, NUM_CHANNEL_LOOKUP_BUCKETS); + host->msg_assembler_lookup_buckets = arena_push_array_zero(&host->arena, struct host_msg_assembler_lookup_bucket, NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS); + + host->sock = sock_alloc(bind_address, SOCK_FLAG_NON_BLOCKING); + return host; } void host_release(struct host *host) { sock_release(&host->sock); - arena_release(&host->channels_arena); - arena_release(&host->packet_arena); + arena_release(&host->channel_arena); arena_release(&host->queued_event_arena); arena_release(&host->cmd_arena); + arena_release(&host->arena); } /* ========================== * * Channel * ========================== */ +INTERNAL u64 hash_from_address(struct sock_address address) +{ + return hash_fnv64(HASH_FNV64_BASIS, STRING_FROM_STRUCT(&address)); +} + INTERNAL struct host_channel *host_channel_from_address(struct host *host, struct sock_address address) { - (UNUSED)host; - (UNUSED)address; + u64 hash = hash_from_address(address); + struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[hash % host->num_channel_lookup_buckets]; + for (struct host_channel *channel = bucket->first; channel; channel = channel->next_address_hash) { + if (channel->address_hash == hash && sock_address_eq(channel->address, address)) { + return channel; + } + } + return &_g_host_channel_nil; +} - struct host_channel *res = &_g_host_channel_nil; - return res; +/* Returns nil channel if id = HOST_CHANNEL_ID_ALL */ +INTERNAL struct host_channel *host_single_channel_from_id(struct host *host, struct host_channel_id channel_id) +{ + if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) { + struct host_channel *channel = &host->channels[channel_id.idx]; + if (channel->id.gen == channel_id.gen) { + return channel; + } + } + return &_g_host_channel_nil; } INTERNAL struct host_channel_list host_channels_from_id(struct arena *arena, struct host *host, struct host_channel_id channel_id) { - (UNUSED)arena; - (UNUSED)host; - (UNUSED)channel_id; - struct host_channel_list res = ZI; + if (host_channel_id_eq(channel_id, HOST_CHANNEL_ID_ALL)) { + for (u64 i = 0; i < host->num_channels_reserved; ++i) { + struct host_channel *channel = &host->channels[i]; + if (channel->valid && channel->connected) { + struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node); + n->channel = channel; + if (res.last) { + res.last->next = n; + } else { + res.first = n; + } + res.last = n; + } + } + } else { + struct host_channel *channel = host_single_channel_from_id(host, channel_id); + if (channel->valid) { + struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node); + n->channel = channel; + res.first = n; + res.last = n; + } + } return res; } INTERNAL struct host_channel *host_channel_alloc(struct host *host, struct sock_address address) { - (UNUSED)host; - (UNUSED)address; + struct host_channel_id id = ZI; + struct host_channel *channel; + if (host->first_free_channel) { + channel = host->first_free_channel; + host->first_free_channel = channel->next_free; + id = channel->id; + ++id.gen; + } else { + channel = arena_push(&host->channel_arena, struct host_channel); + id.gen = 1; + id.idx = host->num_channels_reserved; + ++host->num_channels_reserved; + } + MEMZERO_STRUCT(channel); + channel->id = id; + channel->host = host; + channel->address = address; + u64 address_hash = hash_from_address(address); + channel->address_hash = address_hash; - struct host_channel *res = NULL; - return res; + u64 bucket_index = address_hash % host->num_channel_lookup_buckets; + struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[bucket_index]; + if (bucket->last) { + channel->prev_address_hash = bucket->last; + bucket->last->next_address_hash = channel; + } else { + bucket->first = channel; + } + bucket->last = channel; + + return channel; } INTERNAL void host_channel_release(struct host_channel *channel) { - (UNUSED)channel; + struct host *host = channel->host; + + /* Release from lookup table */ + { + struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[channel->address_hash % host->num_channel_lookup_buckets]; + struct host_channel *prev = channel->prev_address_hash; + struct host_channel *next = channel->next_address_hash; + if (prev) { + prev->next_address_hash = next; + } else { + bucket->first = next; + } + if (next) { + next->prev_address_hash = prev; + } else { + bucket->last = prev; + } + } + + /* Release packets */ + { + if (channel->first_unreliable_packet) { + host->first_free_packet = channel->first_unreliable_packet; + channel->last_unreliable_packet->next = host->first_free_packet; + } + if (channel->first_reliable_packet) { + host->first_free_packet = channel->first_reliable_packet; + channel->last_reliable_packet->next = host->first_free_packet; + } + } + + /* Release msg assemblers */ + for (struct host_msg_assembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) { + host_msg_assembler_release(ma); + } + + ++channel->id.gen; + channel->valid = false; + channel->next_free = host->first_free_channel; + host->first_free_channel = channel; } -INTERNAL struct recv_buffer *host_channel_get_recv_buffer(struct host_channel *channel, u64 msg_id) -{ - (UNUSED)channel; - (UNUSED)msg_id; +/* ========================== * + * Msg assembler + * ========================== */ - struct recv_buffer *res = NULL; +INTERNAL u64 hash_from_channel_msg(struct host_channel_id channel_id, u64 msg_id) +{ + u64 res = HASH_FNV64_BASIS; + res = hash_fnv64(res, STRING_FROM_STRUCT(&channel_id)); + res = hash_fnv64(res, STRING_FROM_STRUCT(&msg_id)); return res; } +INTERNAL struct host_msg_assembler *host_get_msg_assembler(struct host *host, struct host_channel_id channel_id, u64 msg_id) +{ + u64 hash = hash_from_channel_msg(channel_id, msg_id); + struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[hash % host->num_msg_assembler_lookup_buckets]; + for (struct host_msg_assembler *ma = bucket->first; ma; ma = ma->next_hash) { + if (ma->hash == hash && host_channel_id_eq(ma->channel->id, channel_id) && ma->msg_id == msg_id) { + return ma; + } + } + return NULL; +} + +INTERNAL struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns) +{ + struct host *host = channel->host; + struct host_msg_assembler *ma; + if (host->first_free_msg_assembler) { + ma = host->first_free_msg_assembler; + host->first_free_msg_assembler = ma->next_free; + } else { + ma = arena_push(&host->arena, struct host_msg_assembler); + } + MEMZERO_STRUCT(ma); + ma->channel = channel; + ma->msg_id = msg_id; + + ma->num_chunks_total = chunk_count; + + /* FIXME: Use buddy allocator or something */ + u64 buff_size = chunk_count * PACKET_CHUNK_MAX_LEN; + ma->testarena = arena_alloc(buff_size); + /* FIXME: Ensure chunk_count > 0 */ + ma->chunks_received_bitmap = arena_push_array_zero(&ma->testarena, u8, (((chunk_count - 1) / 8) + 1)); + ma->data = arena_push_array(&ma->testarena, u8, buff_size); + + /* Insert into channel list */ + ma->touched_ns = now_ns; + if (channel->most_recent_msg_assembler) { + channel->most_recent_msg_assembler->more_recent = ma; + ma->less_recent = channel->most_recent_msg_assembler; + } else { + channel->least_recent_msg_assembler = ma; + } + channel->most_recent_msg_assembler = ma; + + return ma; +} + +INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma) +{ + struct host *host = ma->host; + struct host_channel *channel = ma->channel; + + /* FIXME: Data should be in a buddy allocator or something */ + arena_release(&ma->testarena); + + /* Release from channel list */ + { + struct host_msg_assembler *prev = ma->less_recent; + struct host_msg_assembler *next = ma->more_recent; + if (prev) { + prev->more_recent = next; + } else { + channel->least_recent_msg_assembler = next; + } + if (next) { + next->less_recent = prev; + } else { + channel->most_recent_msg_assembler = prev; + } + } + + ma->next_free = host->first_free_msg_assembler; + host->first_free_msg_assembler = ma; +} + +INTERNAL b32 host_msg_assembler_is_chunk_filled(struct host_msg_assembler *ma, u64 chunk_id) +{ + if (chunk_id < ma->num_chunks_total) { + return (ma->chunks_received_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0; + } + return false; +} + +INTERNAL void host_msg_assembler_set_chunk_received(struct host_msg_assembler *ma, u64 chunk_id) +{ + if (chunk_id < ma->num_chunks_total) { + ma->chunks_received_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8)); + } +} + /* ========================== * * Packet * ========================== */ -INTERNAL struct packet *host_channel_packet_alloc(struct host_channel *channel, b32 is_reliable) +INTERNAL struct host_packet *host_channel_packet_alloc(struct host_channel *channel, b32 is_reliable) { struct host *host = channel->host; - struct packet *packet = NULL; + struct host_packet *host_packet = NULL; if (host->first_free_packet) { - packet = host->first_free_packet; - host->first_free_packet = packet->next; + host_packet = host->first_free_packet; + host->first_free_packet = host_packet->next; } else { - packet = arena_push(&host->packet_arena, struct packet); + host_packet = arena_push(&host->channel_arena, struct host_packet); } - MEMZERO_STRUCT(packet); + MEMZERO_STRUCT(host_packet); if (is_reliable) { if (channel->last_reliable_packet) { - channel->last_reliable_packet->next = packet; + channel->last_reliable_packet->next = host_packet; } else { - channel->first_reliable_packet = packet; + channel->first_reliable_packet = host_packet; } - channel->last_reliable_packet = packet; + channel->last_reliable_packet = host_packet; } else { if (channel->last_unreliable_packet) { - channel->last_unreliable_packet->next = packet; + channel->last_unreliable_packet->next = host_packet; } else { - channel->first_unreliable_packet = packet; + channel->first_unreliable_packet = host_packet; } - channel->last_unreliable_packet = packet; + channel->last_unreliable_packet = host_packet; } - return packet; + return host_packet; } /* ========================== * - * Recv buffer - * ========================== */ - -INTERNAL struct recv_buffer *recv_buffer_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count) -{ - (UNUSED)channel; - (UNUSED)msg_id; - (UNUSED)chunk_count; - - struct recv_buffer *res = NULL; - return res; -} - -INTERNAL b32 recv_buffer_is_chunk_filled(struct recv_buffer *rb, u64 chunk_id) -{ - (UNUSED)rb; - (UNUSED)chunk_id; - - return false; -} - -INTERNAL void recv_buffer_set_chunk_received(struct recv_buffer *rb, u64 chunk_id) -{ - (UNUSED)rb; - (UNUSED)chunk_id; -} - -/* ========================== * - * Queue + * Cmd interface * ========================== */ INTERNAL struct host_cmd *host_cmd_alloc_and_append(struct host *host) @@ -312,7 +540,7 @@ void host_update(struct host *host) if (magic == PACKET_MAGIC) { /* TODO: Combine kind byte with flags byte */ struct host_channel *channel = host_channel_from_address(host, address); - enum packet_kind packet_kind = br_read_i8(&br); + enum host_packet_kind host_packet_kind = br_read_i8(&br); u8 packet_flags = br_read_u8(&br); u64 their_acked_seq = br_read_var_uint(&br); @@ -321,7 +549,7 @@ void host_update(struct host *host) } b32 should_process_packet = false; - if (packet_flags & PACKET_FLAG_RELIABLE) { + if (packet_flags & HOST_PACKET_FLAG_RELIABLE) { u64 packet_seq = br_read_var_uint(&br); if (packet_seq == channel->our_acked_seq + 1) { channel->our_acked_seq = packet_seq; @@ -332,8 +560,8 @@ void host_update(struct host *host) } if (should_process_packet) { - switch (packet_kind) { - case PACKET_KIND_TRY_CONNECT: + switch (host_packet_kind) { + case HOST_PACKET_KIND_TRY_CONNECT: { /* A foreign host is trying to connect to us */ if (!channel->valid) { @@ -343,7 +571,7 @@ void host_update(struct host *host) } } break; - case PACKET_KIND_CONNECT_SUCCESS: + case HOST_PACKET_KIND_CONNECT_SUCCESS: { /* We successfully connected to a foreign host and they are ready to receive messages */ if (channel->valid && !channel->connected) { @@ -354,7 +582,7 @@ void host_update(struct host *host) } } break; - case PACKET_KIND_DISCONNECT: + case HOST_PACKET_KIND_DISCONNECT: { /* A foreign host disconnected from us */ if (channel->valid) { @@ -366,7 +594,7 @@ void host_update(struct host *host) } break; - case PACKET_KIND_MSG_CHUNK: + case HOST_PACKET_KIND_MSG_CHUNK: { /* Packet is chunk out of belonging to message */ u64 msg_id = br_read_var_uint(&br); @@ -376,31 +604,31 @@ void host_update(struct host *host) b32 is_last_chunk = (chunk_id + 1) == chunk_count; u64 data_len = is_last_chunk ? (br_read_u8(&br) + 1) : PACKET_CHUNK_MAX_LEN; - struct recv_buffer *recv_buff = host_channel_get_recv_buffer(channel, msg_id); - if (!recv_buff) { - recv_buff = recv_buffer_alloc(channel, msg_id, chunk_count); + struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id); + if (!ma) { + ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns); } - if (chunk_count == recv_buff->num_chunks_total && chunk_id < chunk_count) { - if (!recv_buffer_is_chunk_filled(recv_buff, chunk_id)) { + if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) { + if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) { u8 *src = br_seek(&br, data_len); if (src) { - u8 *dst = &recv_buff->data[chunk_id * PACKET_CHUNK_MAX_LEN]; + u8 *dst = &ma->data[chunk_id * PACKET_CHUNK_MAX_LEN]; MEMCPY(dst, src, data_len); if (is_last_chunk) { - recv_buff->last_chunk_len = data_len; + ma->last_chunk_len = data_len; } - recv_buffer_set_chunk_received(recv_buff, chunk_id); - ++recv_buff->num_chunks_received; - recv_buff->touched_ns = now_ns; - if (recv_buff->num_chunks_received == chunk_count) { + host_msg_assembler_set_chunk_received(ma, chunk_id); + ++ma->num_chunks_received; + ma->touched_ns = now_ns; + if (ma->num_chunks_received == chunk_count) { /* All chunks filled, message has finished assembling */ /* TODO: Message ordering */ struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host); struct string data = ZI; - data.len = ((chunk_count - 1) * PACKET_CHUNK_MAX_LEN) + recv_buff->last_chunk_len; + data.len = ((chunk_count - 1) * PACKET_CHUNK_MAX_LEN) + ma->last_chunk_len; data.text = arena_push_array(&host->queued_event_arena, u8, data.len); - MEMCPY(data.text, recv_buff->data, data.len); + MEMCPY(data.text, ma->data, data.len); queued_event->event.kind = HOST_EVENT_KIND_MSG; queued_event->event.msg = data; queued_event->event.channel_id = channel->id; @@ -420,10 +648,8 @@ void host_update(struct host *host) } } - /* Release expired msg buffers */ - /* Update channels */ - for (u64 i = 0; i < host->channels_reserved; ++i) { + for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct host_channel *channel = &host->channels[i]; if (channel->valid) { /* Send / resend handshake if not connected */ @@ -435,73 +661,73 @@ void host_update(struct host *host) /* Release acked reliable packets */ { u64 acked_seq = channel->their_acked_seq; - struct packet *packet = channel->first_reliable_packet; - while (packet) { - struct packet *next = packet->next; - u64 seq = packet->seq; + struct host_packet *host_packet = channel->first_reliable_packet; + while (host_packet) { + struct host_packet *next = host_packet->next; + u64 seq = host_packet->seq; if (seq < acked_seq) { - packet->next = host->first_free_packet; - host->first_free_packet = packet; + host_packet->next = host->first_free_packet; + host->first_free_packet = host_packet; channel->first_reliable_packet = next; } else { break; } - packet = next; + host_packet = next; } if (channel->first_reliable_packet == NULL) { channel->last_reliable_packet = NULL; } } + /* TODO: Release timed out unreliable msg buffers */ } } /* Process cmds */ /* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */ for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) { - struct temp_arena temp = arena_temp_begin(scratch.arena); enum host_cmd_kind kind = cmd->kind; struct host_channel_id channel_id = cmd->channel_id; - struct host_channel_list channels = host_channels_from_id(temp.arena, host, channel_id); + struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id); for (struct host_channel_node *node = channels.first; node; node = node->next) { struct host_channel *channel = node->channel; switch (kind) { case HOST_CMD_KIND_TRY_CONNECT: { u8 packet_flags = 0; - struct packet *packet = host_channel_packet_alloc(channel, false); - struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data)); - bw_write_i8(&bw, PACKET_KIND_TRY_CONNECT); + struct host_packet *host_packet = host_channel_packet_alloc(channel, false); + struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); + bw_write_i8(&bw, HOST_PACKET_KIND_TRY_CONNECT); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); - packet->data_len = bw_pos(&bw); + host_packet->data_len = bw_pos(&bw); } break; case HOST_CMD_KIND_CONNECT_SUCCESS: { u8 packet_flags = 0; - struct packet *packet = host_channel_packet_alloc(channel, false); - struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data)); - bw_write_i8(&bw, PACKET_KIND_CONNECT_SUCCESS); + struct host_packet *host_packet = host_channel_packet_alloc(channel, false); + struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); + bw_write_i8(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); - packet->data_len = bw_pos(&bw); + host_packet->data_len = bw_pos(&bw); } break; case HOST_CMD_KIND_DISCONNECT: { u8 packet_flags = 0; - struct packet *packet = host_channel_packet_alloc(channel, false); - struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data)); - bw_write_i8(&bw, PACKET_KIND_DISCONNECT); + struct host_packet *host_packet = host_channel_packet_alloc(channel, false); + struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); + bw_write_i8(&bw, HOST_PACKET_KIND_DISCONNECT); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); - packet->data_len = bw_pos(&bw); + host_packet->data_len = bw_pos(&bw); } break; case HOST_CMD_KIND_WRITE: { b32 is_reliable = cmd->write_reliable; - u8 packet_flags = (is_reliable * PACKET_FLAG_RELIABLE); + u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE); struct string msg = cmd->write_msg; u64 chunk_count = 0; @@ -517,41 +743,40 @@ void host_update(struct host *host) data_len = msg.len % PACKET_CHUNK_MAX_LEN; } u8 *data = msg.text + (i * PACKET_CHUNK_MAX_LEN); - struct packet *packet = host_channel_packet_alloc(channel, is_reliable); - struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data)); - bw_write_i8(&bw, PACKET_KIND_MSG_CHUNK); + struct host_packet *host_packet = host_channel_packet_alloc(channel, is_reliable); + struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); + bw_write_i8(&bw, HOST_PACKET_KIND_MSG_CHUNK); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); if (is_reliable) { - bw_write_var_uint(&bw, packet->seq); + bw_write_var_uint(&bw, host_packet->seq); } if (is_last_chunk) { /* FIXME: Ensure data_len can never be 0 */ bw_write_u8(&bw, data_len - 1); } bw_write_buffer(&bw, STRING(data_len, data)); - packet->data_len = bw_pos(&bw); + host_packet->data_len = bw_pos(&bw); } } break; default: break; } } - arena_temp_end(temp); } /* Process packets */ /* TODO: Aggregate small packets */ - for (u64 i = 0; i < host->channels_reserved; ++i) { + for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct host_channel *channel = &host->channels[i]; struct sock_address address = channel->address; /* Send unreliable packets to channel */ - for (struct packet *packet = channel->first_unreliable_packet; packet; packet = packet->next) { - sock_write(sock, address, STRING(packet->data_len, packet->data)); + for (struct host_packet *host_packet = channel->first_unreliable_packet; host_packet; host_packet = host_packet->next) { + sock_write(sock, address, STRING(host_packet->data_len, host_packet->data)); } /* Send un-acked reliable packets to channel */ - for (struct packet *packet = channel->first_reliable_packet; packet; packet = packet->next) { - sock_write(sock, address, STRING(packet->data_len, packet->data)); + for (struct host_packet *host_packet = channel->first_reliable_packet; host_packet; host_packet = host_packet->next) { + sock_write(sock, address, STRING(host_packet->data_len, host_packet->data)); } /* Release unreliable packets */ if (channel->first_unreliable_packet) { diff --git a/src/host.h b/src/host.h index e0231e67..6770b770 100644 --- a/src/host.h +++ b/src/host.h @@ -6,7 +6,8 @@ #define HOST_CHANNEL_ID_NIL (struct host_channel_id) { .gen = 0, .idx = 0 } #define HOST_CHANNEL_ID_ALL (struct host_channel_id) { .gen = U32_MAX, .idx = U32_MAX } -struct packet; +struct host_packet; +struct host_channel_lookup_bucket; enum host_cmd_kind { HOST_CMD_KIND_NONE, @@ -47,6 +48,7 @@ struct host_event_array { }; struct host { + struct arena arena; struct sock sock; struct arena cmd_arena; @@ -59,12 +61,19 @@ struct host { struct host_queued_event *last_queued_event; u64 num_queued_events; - struct arena packet_arena; - struct packet *first_free_packet; - - struct arena channels_arena; + struct arena channel_arena; struct host_channel *channels; - u64 channels_reserved; + struct host_channel *first_free_channel; + u64 num_channels_reserved; + + struct host_packet *first_free_packet; /* Allocated in `arena` */ + struct host_msg_assembler *first_free_msg_assembler; /* Allocated in `arena` */ + + struct host_channel_lookup_bucket *channel_lookup_buckets; /* Allocated in `arena` */ + u64 num_channel_lookup_buckets; + + struct host_msg_assembler_lookup_bucket *msg_assembler_lookup_buckets; /* Allocated in `arena` */ + u64 num_msg_assembler_lookup_buckets; }; /* ========================== * @@ -78,7 +87,7 @@ struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr); * Host * ========================== */ -struct host host_alloc(struct sock_address bind_address); +struct host *host_alloc(struct sock_address bind_address); void host_release(struct host *host); @@ -114,6 +123,11 @@ struct host_event_array host_pop_events(struct arena *arena, struct host *host); +INLINE b32 host_channel_id_eq(struct host_channel_id a, struct host_channel_id b) +{ + return a.idx == b.idx && a.gen == b.gen; +} + INLINE b32 host_channel_id_is_nil(struct host_channel_id id) { return id.gen == 0 && id.idx == 0; diff --git a/src/rng.c b/src/rng.c index 64681cd4..df9853d6 100644 --- a/src/rng.c +++ b/src/rng.c @@ -39,11 +39,11 @@ INTERNAL void gen_random_file(struct string path, u32 count) struct rng_startup_receipt rng_startup(struct resource_startup_receipt *resource_sr) { (UNUSED)resource_sr; - G.arena = arena_alloc(GIGABYTE(64)); struct string noise_path = LIT("res/noise.dat"); if (resource_exists(noise_path)) { struct resource r = resource_open(noise_path); G.noise_count = r.data.len / sizeof(*G.noise); + G.arena = arena_alloc(sizeof(u64) * G.noise_count); G.noise = arena_push_array(&G.arena, u64, G.noise_count); MEMCPY(G.noise, r.data.text, r.data.len); resource_close(r); diff --git a/src/sock.h b/src/sock.h index 219e5876..2498a6b4 100644 --- a/src/sock.h +++ b/src/sock.h @@ -1,8 +1,7 @@ #ifndef SOCK_H #define SOCK_H - - +#include "memory.h" #if 1 @@ -53,6 +52,22 @@ void sock_write(struct sock *sock, struct sock_address address, struct string da + + + +INLINE b32 sock_address_eq(struct sock_address a, struct sock_address b) +{ + return MEMEQ_STRUCT(&a, &b); +} + + + + + + + + + #else #define SOCK_IP_ANY_INTERFACE CPPCOMPAT_INITLIST_TYPE(struct sock_ip) { 0 } diff --git a/src/user.c b/src/user.c index a63cff2e..37d6289d 100644 --- a/src/user.c +++ b/src/user.c @@ -43,7 +43,7 @@ GLOBAL struct { struct sys_window *window; - struct host host; + struct host *host; /* Render targets */ struct renderer_texture final_texture; @@ -603,7 +603,7 @@ INTERNAL void user_update(void) struct game_event_list game_events = ZI; { - struct host_event_array host_events = host_pop_events(scratch.arena, &G.host); + struct host_event_array host_events = host_pop_events(scratch.arena, G.host); game_events_from_host_events(scratch.arena, host_events, &game_events); } @@ -619,7 +619,7 @@ INTERNAL void user_update(void) f64 now = SECONDS_FROM_NS(sys_time_ns()); if (last_try_connect == 0 || (now - last_try_connect) > 5) { struct sock_address connect_addr = sock_address_from_string(LIT("127.0.0.1:12345")); - host_queue_connect_to_address(&G.host, connect_addr); + host_queue_connect_to_address(G.host, connect_addr); last_try_connect = now; } @@ -1671,12 +1671,12 @@ INTERNAL void user_update(void) struct temp_arena temp = arena_temp_begin(scratch.arena); struct string cmds_str = game_string_from_cmds(temp.arena, cmd_list); - host_queue_write(&G.host, HOST_CHANNEL_ID_ALL, cmds_str); + host_queue_write(G.host, HOST_CHANNEL_ID_ALL, cmds_str); arena_temp_end(temp); } - host_update(&G.host); + host_update(G.host); /* ========================== * * Render