#include "arena.h" #include "host.h" #include "scratch.h" #include "byteio.h" #include "sys.h" #include "util.h" #include "log.h" #include "buddy.h" #include "atomic.h" //#define HOST_NETWORK_ADDRESS_STRING(str) //#define HOST_NETWORK_ADDRESS_ALL_LOCAL_INTERFACES(port) //#define HOST_NETWORK_ADDRESS_NONE #define PACKET_MAGIC 0xd9e3b8b6 #define PACKET_MSG_CHUNK_MAX_LEN 1024 #define PACKET_DATA_MAX_LEN (1280 * 2) /* Give enough space for msg chunk + header */ #define NUM_CHANNEL_LOOKUP_BUCKETS 512 #define NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS 16384 enum host_packet_kind { HOST_PACKET_KIND_NONE, HOST_PACKET_KIND_TRY_CONNECT, HOST_PACKET_KIND_CONNECT_SUCCESS, HOST_PACKET_KIND_DISCONNECT, HOST_PACKET_KIND_MSG_CHUNK }; enum host_packet_flag { HOST_PACKET_FLAG_NONE = 0, HOST_PACKET_FLAG_RELIABLE = (1 << 0) }; struct host_snd_packet { struct host_snd_packet *next; u64 seq; u64 data_len; u8 data[PACKET_DATA_MAX_LEN]; }; struct host_channel { struct host_channel_id id; b32 valid; b32 connected; struct host *host; struct host_channel *next_free; struct sock_address address; u64 address_hash; struct host_channel *next_address_hash; struct host_channel *prev_address_hash; /* NOTE: Packets are allocated in host's `arena` */ struct host_snd_packet *first_reliable_packet; struct host_snd_packet *last_reliable_packet; struct host_snd_packet *first_unreliable_packet; struct host_snd_packet *last_unreliable_packet; u64 num_reliable_packets; u64 num_unreliable_packets; /* NOTE: Msg assemblers are allocated in host's `arena` */ struct host_msg_assembler *least_recent_msg_assembler; struct host_msg_assembler *most_recent_msg_assembler; u64 last_sent_msg_id; u64 their_acked_seq; u64 our_acked_seq; u64 last_sent_seq; }; struct host_channel_node { struct host_channel *channel; struct host_channel_node *next; }; struct host_channel_list { struct host_channel_node *first; struct host_channel_node *last; }; struct host_channel_lookup_bucket { struct host_channel *first; struct host_channel *last; }; struct host_queued_event { struct host_event event; struct host_queued_event *next; }; struct host_rcv_packet { struct sock *sock; struct sock_address address; struct string data; struct host_rcv_packet *next; }; struct host_rcv_buffer { struct arena arena; struct host_rcv_packet *first_packet; struct host_rcv_packet *last_packet; }; struct host_msg_assembler { struct host_channel *channel; b32 is_reliable; /* Free list */ struct host_msg_assembler *next_free; /* Bucket list */ struct host_msg_assembler *next_hash; struct host_msg_assembler *prev_hash; /* Channel list */ struct host_msg_assembler *less_recent; struct host_msg_assembler *more_recent; u64 msg_id; u64 hash; u64 last_chunk_len; u64 num_chunks_total; u64 num_chunks_received; i64 touched_ns; struct buddy_block *buddy_block; u8 *chunk_bitmap; u8 *chunk_data; }; struct host_msg_assembler_lookup_bucket { struct host_msg_assembler *first; struct host_msg_assembler *last; }; READONLY GLOBAL struct host_channel _g_host_channel_nil = { .valid = false }; GLOBAL struct { i32 _; } G = ZI, DEBUG_ALIAS(G, G_host); INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg); INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma); /* ========================== * * Startup * ========================== */ struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr) { (UNUSED)sock_sr; return (struct host_startup_receipt) { 0 }; } /* ========================== * * Host * ========================== */ struct host *host_alloc(u16 listen_port) { struct arena arena = arena_alloc(GIGABYTE(64)); struct host *host = arena_push_zero(&arena, struct host); host->arena = arena; host->cmd_arena = arena_alloc(GIGABYTE(64)); host->queued_event_arena = arena_alloc(GIGABYTE(64)); host->channel_arena = arena_alloc(GIGABYTE(64)); host->rcv_buffer_read = arena_push_zero(&host->arena, struct host_rcv_buffer); host->rcv_buffer_write = arena_push_zero(&host->arena, struct host_rcv_buffer); host->rcv_buffer_read->arena = arena_alloc(GIGABYTE(64)); host->rcv_buffer_write->arena = arena_alloc(GIGABYTE(64)); host->buddy = buddy_ctx_alloc(GIGABYTE(64)); host->channels = arena_dry_push(&host->channel_arena, struct host_channel); host->num_channel_lookup_buckets = NUM_CHANNEL_LOOKUP_BUCKETS; host->channel_lookup_buckets = arena_push_array_zero(&host->arena, struct host_channel_lookup_bucket, host->num_channel_lookup_buckets); host->num_msg_assembler_lookup_buckets = NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS; host->msg_assembler_lookup_buckets = arena_push_array_zero(&host->arena, struct host_msg_assembler_lookup_bucket, host->num_msg_assembler_lookup_buckets); host->sock = sock_alloc(listen_port, MEGABYTE(2), MEGABYTE(2)); host->rcv_buffer_write_mutex = sys_mutex_alloc(); host->receiver_thread = sys_thread_alloc(&host_receiver_thread_entry_point, host, LIT("[P6] Host receiver")); return host; } void host_release(struct host *host) { atomic_i32_eval_exchange(&host->receiver_thread_shutdown_flag, 1); sock_wake(host->sock); sys_thread_wait_release(&host->receiver_thread); sys_mutex_release(&host->rcv_buffer_write_mutex); sock_release(host->sock); buddy_ctx_release(host->buddy); arena_release(&host->rcv_buffer_write->arena); arena_release(&host->rcv_buffer_read->arena); arena_release(&host->channel_arena); arena_release(&host->queued_event_arena); arena_release(&host->cmd_arena); arena_release(&host->arena); } /* ========================== * * Channel * ========================== */ INTERNAL u64 hash_from_address(struct sock_address address) { return hash_fnv64(HASH_FNV64_BASIS, STRING_FROM_STRUCT(&address)); } INTERNAL struct host_channel *host_channel_from_address(struct host *host, struct sock_address address) { u64 hash = hash_from_address(address); struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[hash % host->num_channel_lookup_buckets]; for (struct host_channel *channel = bucket->first; channel; channel = channel->next_address_hash) { if (channel->address_hash == hash && sock_address_eq(channel->address, address)) { return channel; } } return &_g_host_channel_nil; } /* Returns nil channel if id = HOST_CHANNEL_ID_ALL */ INTERNAL struct host_channel *host_single_channel_from_id(struct host *host, struct host_channel_id channel_id) { if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) { struct host_channel *channel = &host->channels[channel_id.idx]; if (channel->id.gen == channel_id.gen) { return channel; } } return &_g_host_channel_nil; } INTERNAL struct host_channel_list host_channels_from_id(struct arena *arena, struct host *host, struct host_channel_id channel_id) { struct host_channel_list res = ZI; if (host_channel_id_eq(channel_id, HOST_CHANNEL_ID_ALL)) { for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct host_channel *channel = &host->channels[i]; if (channel->valid) { struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node); n->channel = channel; if (res.last) { res.last->next = n; } else { res.first = n; } res.last = n; } } } else { struct host_channel *channel = host_single_channel_from_id(host, channel_id); if (channel->valid) { struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node); n->channel = channel; res.first = n; res.last = n; } } return res; } INTERNAL struct host_channel *host_channel_alloc(struct host *host, struct sock_address address) { struct host_channel_id id = ZI; struct host_channel *channel; if (host->first_free_channel) { channel = host->first_free_channel; host->first_free_channel = channel->next_free; id = channel->id; ++id.gen; } else { channel = arena_push(&host->channel_arena, struct host_channel); id.gen = 1; id.idx = host->num_channels_reserved; ++host->num_channels_reserved; } MEMZERO_STRUCT(channel); channel->valid = true; channel->id = id; channel->host = host; channel->address = address; u64 address_hash = hash_from_address(address); channel->address_hash = address_hash; u64 bucket_index = address_hash % host->num_channel_lookup_buckets; struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[bucket_index]; if (bucket->last) { channel->prev_address_hash = bucket->last; bucket->last->next_address_hash = channel; } else { bucket->first = channel; } bucket->last = channel; return channel; } INTERNAL void host_channel_release(struct host_channel *channel) { struct host *host = channel->host; /* Release from lookup table */ { struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[channel->address_hash % host->num_channel_lookup_buckets]; struct host_channel *prev = channel->prev_address_hash; struct host_channel *next = channel->next_address_hash; if (prev) { prev->next_address_hash = next; } else { bucket->first = next; } if (next) { next->prev_address_hash = prev; } else { bucket->last = prev; } } /* Release packets */ { if (channel->first_unreliable_packet) { host->first_free_packet = channel->first_unreliable_packet; channel->last_unreliable_packet->next = host->first_free_packet; } if (channel->first_reliable_packet) { host->first_free_packet = channel->first_reliable_packet; channel->last_reliable_packet->next = host->first_free_packet; } } /* Release msg assemblers */ for (struct host_msg_assembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) { host_msg_assembler_release(ma); } ++channel->id.gen; channel->valid = false; channel->next_free = host->first_free_channel; host->first_free_channel = channel; } /* ========================== * * Msg assembler * ========================== */ INTERNAL u64 hash_from_channel_msg(struct host_channel_id channel_id, u64 msg_id) { u64 res = HASH_FNV64_BASIS; res = hash_fnv64(res, STRING_FROM_STRUCT(&channel_id)); res = hash_fnv64(res, STRING_FROM_STRUCT(&msg_id)); return res; } INTERNAL struct host_msg_assembler *host_get_msg_assembler(struct host *host, struct host_channel_id channel_id, u64 msg_id) { u64 hash = hash_from_channel_msg(channel_id, msg_id); struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[hash % host->num_msg_assembler_lookup_buckets]; for (struct host_msg_assembler *ma = bucket->first; ma; ma = ma->next_hash) { if (ma->hash == hash && host_channel_id_eq(ma->channel->id, channel_id) && ma->msg_id == msg_id) { return ma; } } return NULL; } INTERNAL struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable) { struct host *host = channel->host; struct host_msg_assembler *ma; if (host->first_free_msg_assembler) { ma = host->first_free_msg_assembler; host->first_free_msg_assembler = ma->next_free; } else { ma = arena_push(&host->arena, struct host_msg_assembler); } MEMZERO_STRUCT(ma); ma->channel = channel; ma->msg_id = msg_id; ma->num_chunks_total = chunk_count; u64 chunk_bitmap_size = ((chunk_count - 1) / 8) + 1; if ((chunk_bitmap_size % 16) != 0) { /* Align chunk bitmap to 16 so msg data is aligned */ chunk_bitmap_size += 16 - (chunk_bitmap_size % 16); } u64 chunk_data_size = chunk_count * PACKET_MSG_CHUNK_MAX_LEN; /* Allocate msg data using buddy allocator since the assembler has * arbitrary lifetime and data needs to stay contiguous for random * access as packets are received */ ma->buddy_block = buddy_alloc(host->buddy, chunk_bitmap_size + chunk_data_size); ma->chunk_bitmap = ma->buddy_block->memory; MEMZERO(ma->chunk_bitmap, chunk_bitmap_size); ma->chunk_data = ma->chunk_bitmap + chunk_bitmap_size; /* FIXME: Ensure chunk_count > 0 */ ma->is_reliable = is_reliable; /* Insert into channel list */ ma->touched_ns = now_ns; if (channel->most_recent_msg_assembler) { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; } else { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; /* Insert into lookup table */ u64 hash = hash_from_channel_msg(channel->id, msg_id); ma->hash = hash; struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[hash % host->num_msg_assembler_lookup_buckets]; if (bucket->last) { bucket->last->next_hash = ma; ma->prev_hash = bucket->last; } else { bucket->first = ma; } bucket->last = ma; return ma; } INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma) { struct host_channel *channel = ma->channel; struct host *host = channel->host; buddy_release(ma->buddy_block); /* Release from channel list */ { struct host_msg_assembler *prev = ma->less_recent; struct host_msg_assembler *next = ma->more_recent; if (prev) { prev->more_recent = next; } else { channel->least_recent_msg_assembler = next; } if (next) { next->less_recent = prev; } else { channel->most_recent_msg_assembler = prev; } } /* Release from lookup table */ struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[ma->hash % host->num_msg_assembler_lookup_buckets]; { struct host_msg_assembler *prev = ma->prev_hash; struct host_msg_assembler *next = ma->next_hash; if (prev) { prev->next_hash = next; } else { bucket->first = next; } if (next) { next->prev_hash = prev; } else { bucket->last = prev; } } ma->next_free = host->first_free_msg_assembler; host->first_free_msg_assembler = ma; } INTERNAL void host_msg_assembler_touch(struct host_msg_assembler *ma, i64 now_ns) { struct host_channel *channel = ma->channel; if (ma != channel->most_recent_msg_assembler) { /* Remove from channel list */ { struct host_msg_assembler *prev = ma->less_recent; struct host_msg_assembler *next = ma->more_recent; if (prev) { prev->more_recent = next; } else { channel->least_recent_msg_assembler = next; } if (next) { next->less_recent = prev; } else { channel->most_recent_msg_assembler = prev; } } /* Insert at end of channel list */ { if (channel->most_recent_msg_assembler) { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; } else { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; } } ma->touched_ns = now_ns; } INTERNAL b32 host_msg_assembler_is_chunk_filled(struct host_msg_assembler *ma, u64 chunk_id) { if (chunk_id < ma->num_chunks_total) { return (ma->chunk_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0; } return false; } INTERNAL void host_msg_assembler_set_chunk_received(struct host_msg_assembler *ma, u64 chunk_id) { if (chunk_id < ma->num_chunks_total) { ma->chunk_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8)); } } /* ========================== * * Packet * ========================== */ INTERNAL struct host_snd_packet *host_channel_snd_packet_alloc(struct host_channel *channel, b32 is_reliable) { struct host *host = channel->host; struct host_snd_packet *packet = NULL; if (host->first_free_packet) { packet = host->first_free_packet; host->first_free_packet = packet->next; } else { packet = arena_push(&host->arena, struct host_snd_packet); } MEMZERO_STRUCT(packet); if (is_reliable) { if (channel->last_reliable_packet) { channel->last_reliable_packet->next = packet; } else { channel->first_reliable_packet = packet; } channel->last_reliable_packet = packet; ++channel->num_reliable_packets; packet->seq = ++channel->last_sent_seq; } else { if (channel->last_unreliable_packet) { channel->last_unreliable_packet->next = packet; } else { channel->first_unreliable_packet = packet; } channel->last_unreliable_packet = packet; ++channel->num_unreliable_packets; } return packet; } /* ========================== * * Cmd interface * ========================== */ INTERNAL struct host_cmd *host_cmd_alloc_and_append(struct host *host) { struct host_cmd *cmd = arena_push_zero(&host->cmd_arena, struct host_cmd); if (host->last_cmd) { host->last_cmd->next = cmd; } else { host->first_cmd = cmd; } host->last_cmd = cmd; return cmd; } void host_queue_connect_to_address(struct host *host, struct sock_address connect_address) { struct host_channel *channel = host_channel_from_address(host, connect_address); if (!channel->valid) { channel = host_channel_alloc(host, connect_address); } } void host_queue_disconnect(struct host *host, struct host_channel_id channel_id) { struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_DISCONNECT; cmd->channel_id = channel_id; } void host_queue_write(struct host *host, struct host_channel_id channel_id, struct string msg, u32 flags) { struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_WRITE; cmd->channel_id = channel_id; cmd->write_msg = string_copy(&host->cmd_arena, msg); cmd->write_reliable = flags & HOST_WRITE_FLAG_RELIABLE; } /* ========================== * * Update * ========================== */ INTERNAL struct host_queued_event *host_queued_event_alloc_and_append(struct host *host) { struct host_queued_event *qe = arena_push_zero(&host->queued_event_arena, struct host_queued_event); if (host->last_queued_event) { host->last_queued_event->next = qe; } else { host->first_queued_event = qe; } host->last_queued_event = qe; ++host->num_queued_events; return qe; } void host_update(struct host *host) { __prof; struct temp_arena scratch = scratch_begin_no_conflict(); i64 now_ns = sys_time_ns(); { __profscope(host_update_read_packets); struct string read_buff = ZI; read_buff.len = PACKET_DATA_MAX_LEN; read_buff.text = arena_push_array(scratch.arena, u8, read_buff.len); /* Swap read & write rcv buffers */ { struct sys_lock lock = sys_mutex_lock_e(&host->rcv_buffer_write_mutex); struct host_rcv_buffer *swp = host->rcv_buffer_read; host->rcv_buffer_read = host->rcv_buffer_write; host->rcv_buffer_write = swp; sys_mutex_unlock(&lock); } /* Read incoming packets */ struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_read; for (struct host_rcv_packet *packet = rcv_buffer->first_packet; packet; packet = packet->next) { //struct sock *sock = packet->sock; struct sock_address address = packet->address; struct byte_reader br = br_from_buffer(packet->data); u32 magic = br_read_u32(&br); if (magic == PACKET_MAGIC) { /* TODO: Combine kind byte with flags byte */ struct host_channel *channel = host_channel_from_address(host, address); enum host_packet_kind host_packet_kind = br_read_i8(&br); u8 packet_flags = br_read_u8(&br); u64 their_acked_seq = br_read_var_uint(&br); if (their_acked_seq > channel->their_acked_seq) { channel->their_acked_seq = their_acked_seq; } b32 skip_packet = false; b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE; if (channel->valid) { if (is_reliable) { u64 packet_seq = br_read_var_uint(&br); if (packet_seq == channel->our_acked_seq + 1) { channel->our_acked_seq = packet_seq; } else { skip_packet = true; } } } if (!skip_packet) { switch (host_packet_kind) { case HOST_PACKET_KIND_TRY_CONNECT: { /* A foreign host is trying to connect to us */ if (!channel->valid) { logf_info("Received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); /* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */ channel = host_channel_alloc(host, address); } struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS; cmd->channel_id = channel->id; } break; case HOST_PACKET_KIND_CONNECT_SUCCESS: { /* We successfully connected to a foreign host and they are ready to receive messages */ if (channel->valid && !channel->connected) { logf_info("Received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host); queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_OPENED; queued_event->event.channel_id = channel->id; channel->connected = true; } } break; case HOST_PACKET_KIND_DISCONNECT: { /* A foreign host disconnected from us */ if (channel->valid) { logf_info("Received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host); queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_CLOSED; queued_event->event.channel_id = channel->id; host_channel_release(channel); } } break; case HOST_PACKET_KIND_MSG_CHUNK: { if (channel->valid && channel->connected) { /* Packet is chunk out of belonging to message */ u64 msg_id = br_read_var_uint(&br); u64 chunk_id = br_read_var_uint(&br); u64 chunk_count = br_read_var_uint(&br); b32 is_last_chunk = (chunk_id + 1) == chunk_count; u64 data_len = is_last_chunk ? (br_read_var_uint(&br) + 1) : PACKET_MSG_CHUNK_MAX_LEN; struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id); if (!ma) { ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable); } if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) { if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) { u8 *src = br_seek(&br, data_len); if (src) { u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN]; MEMCPY(dst, src, data_len); if (is_last_chunk) { ma->last_chunk_len = data_len; } host_msg_assembler_set_chunk_received(ma, chunk_id); ++ma->num_chunks_received; host_msg_assembler_touch(ma, now_ns); if (ma->num_chunks_received == chunk_count) { /* All chunks filled, message has finished assembling */ /* TODO: Message ordering */ struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host); struct string data = ZI; data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len; data.text = arena_push_array(&host->queued_event_arena, u8, data.len); MEMCPY(data.text, ma->chunk_data, data.len); queued_event->event.kind = HOST_EVENT_KIND_MSG; queued_event->event.msg = data; queued_event->event.channel_id = channel->id; if (is_reliable) { /* Release assembler if reliable */ host_msg_assembler_release(ma); } } } else { ASSERT(false); } } } else { ASSERT(false); } } } break; default: break; } } host->bytes_received += packet->data.len; } } /* Reset read buffer */ rcv_buffer->first_packet = NULL; rcv_buffer->last_packet = NULL; arena_reset(&rcv_buffer->arena); } /* Update channels */ { __profscope(host_update_channels); for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct host_channel *channel = &host->channels[i]; if (channel->valid) { /* Send / resend handshake if not connected */ if (!channel->connected) { struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_TRY_CONNECT; cmd->channel_id = channel->id; } /* Release acked reliable packets */ { u64 acked_seq = channel->their_acked_seq; struct host_snd_packet *host_packet = channel->first_reliable_packet; while (host_packet) { struct host_snd_packet *next = host_packet->next; u64 seq = host_packet->seq; if (seq < acked_seq) { host_packet->next = host->first_free_packet; host->first_free_packet = host_packet; channel->first_reliable_packet = next; --channel->num_reliable_packets; } else { break; } host_packet = next; } if (channel->first_reliable_packet == NULL) { channel->last_reliable_packet = NULL; } } /* Release timed out unreliable msg buffers */ { /* TODO: Configurable timeout */ i64 unreliable_msg_timeout_ns = NS_FROM_SECONDS(0.1); struct host_msg_assembler *ma = channel->least_recent_msg_assembler; while (ma) { struct host_msg_assembler *next = ma->more_recent; if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns) { if (!ma->is_reliable) { host_msg_assembler_release(ma); } } else { break; } ma = next; } } } } } /* Process cmds */ /* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */ { __profscope(host_update_process_cmds); for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) { enum host_cmd_kind kind = cmd->kind; struct host_channel_id channel_id = cmd->channel_id; struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id); for (struct host_channel_node *node = channels.first; node; node = node->next) { struct host_channel *channel = node->channel; switch (kind) { case HOST_CMD_KIND_TRY_CONNECT: { u8 packet_flags = 0; struct host_snd_packet *host_packet = host_channel_snd_packet_alloc(channel, false); struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); bw_write_u32(&bw, PACKET_MAGIC); bw_write_i8(&bw, HOST_PACKET_KIND_TRY_CONNECT); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); host_packet->data_len = bw_pos(&bw); } break; case HOST_CMD_KIND_CONNECT_SUCCESS: { u8 packet_flags = 0; struct host_snd_packet *host_packet = host_channel_snd_packet_alloc(channel, false); struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); bw_write_u32(&bw, PACKET_MAGIC); bw_write_i8(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); host_packet->data_len = bw_pos(&bw); } break; case HOST_CMD_KIND_DISCONNECT: { u8 packet_flags = 0; struct host_snd_packet *host_packet = host_channel_snd_packet_alloc(channel, false); struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); bw_write_u32(&bw, PACKET_MAGIC); bw_write_i8(&bw, HOST_PACKET_KIND_DISCONNECT); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); host_packet->data_len = bw_pos(&bw); } break; case HOST_CMD_KIND_WRITE: { b32 is_reliable = cmd->write_reliable; u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE); struct string msg = cmd->write_msg; u64 chunk_count = 0; if (msg.len > 0) { chunk_count = (msg.len - 1) / PACKET_MSG_CHUNK_MAX_LEN; } chunk_count += 1; u64 msg_id = ++channel->last_sent_msg_id; for (u64 i = 0; i < chunk_count; ++i) { u64 data_len = PACKET_MSG_CHUNK_MAX_LEN; b32 is_last_chunk = i + 1 == chunk_count; if (is_last_chunk) { data_len = msg.len % PACKET_MSG_CHUNK_MAX_LEN; } u8 *data = msg.text + (i * PACKET_MSG_CHUNK_MAX_LEN); struct host_snd_packet *host_packet = host_channel_snd_packet_alloc(channel, is_reliable); struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data)); bw_write_u32(&bw, PACKET_MAGIC); bw_write_i8(&bw, HOST_PACKET_KIND_MSG_CHUNK); bw_write_u8(&bw, packet_flags); bw_write_var_uint(&bw, channel->our_acked_seq); if (is_reliable) { bw_write_var_uint(&bw, host_packet->seq); } bw_write_var_uint(&bw, msg_id); bw_write_var_uint(&bw, i); bw_write_var_uint(&bw, chunk_count); if (is_last_chunk) { /* FIXME: Ensure data_len can never be 0 */ bw_write_var_uint(&bw, data_len - 1); } br_write_bytes(&bw, STRING(data_len, data)); host_packet->data_len = bw_pos(&bw); } } break; default: break; } } } } /* Process packets */ /* TODO: Aggregate small packets */ { __profscope(host_update_send_packets); for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct sock *sock = host->sock; struct host_channel *channel = &host->channels[i]; u64 total_sent = 0; if (channel->valid) { struct sock_address address = channel->address; /* Send reliable packets to channel */ for (struct host_snd_packet *packet = channel->first_reliable_packet; packet; packet = packet->next) { sock_write(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Send unreliable packets to channel */ for (struct host_snd_packet *packet = channel->first_unreliable_packet; packet; packet = packet->next) { sock_write(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Release unreliable packets */ if (channel->first_unreliable_packet) { channel->last_unreliable_packet->next = host->first_free_packet; host->first_free_packet = channel->first_unreliable_packet; channel->first_unreliable_packet = NULL; channel->last_unreliable_packet = NULL; channel->num_unreliable_packets = 0; } host->bytes_sent += total_sent; } } } /* Reset cmds */ host->first_cmd = NULL; host->last_cmd = NULL; arena_reset(&host->cmd_arena); scratch_end(scratch); } /* ========================== * * Events * ========================== */ struct host_event_array host_pop_events(struct arena *arena, struct host *host) { __prof; struct host_event_array res = ZI; res.count = host->num_queued_events; res.events = arena_push_array(arena, struct host_event, res.count); u64 i = 0; for (struct host_queued_event *qe = host->first_queued_event; qe; qe = qe->next) { struct host_event *dest = &res.events[i]; *dest = qe->event; struct string src_msg = qe->event.msg; if (src_msg.len > 0) { dest->msg.text = arena_push_array(arena, u8, src_msg.len); MEMCPY(dest->msg.text, src_msg.text, src_msg.len); } ++i; } /* Reset queued events */ host->num_queued_events = 0; host->first_queued_event = NULL; host->last_queued_event = NULL; arena_reset(&host->queued_event_arena); return res; } /* ========================== * * Receive thread * ========================== */ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg) { u64 read_buff_size = KILOBYTE(64); struct arena read_buff_arena = arena_alloc(read_buff_size); struct string read_buff = ZI; read_buff.len = read_buff_size; read_buff.text = arena_push_array(&read_buff_arena, u8, KILOBYTE(64)); struct host *host = (struct host *)arg; struct sock_array socks = ZI; socks.socks = &host->sock; socks.count = 1; struct atomic_i32 *shutdown = &host->receiver_thread_shutdown_flag; while (!atomic_i32_eval(shutdown)) { struct sock *sock = sock_wait_for_available_read(socks, NULL, F32_INFINITY); struct sock_read_result res; while (!atomic_i32_eval(shutdown) && sock && (res = sock_read(sock, read_buff)).valid) { struct sock_address address = res.address; struct string data = res.data; if (data.len > 0) { struct sys_lock lock = sys_mutex_lock_e(&host->rcv_buffer_write_mutex); { struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_write; struct host_rcv_packet *packet = arena_push_zero(&rcv_buffer->arena, struct host_rcv_packet); packet->address = address; packet->data = string_copy(&rcv_buffer->arena, data); if (rcv_buffer->last_packet) { rcv_buffer->last_packet->next = packet; } else { rcv_buffer->first_packet = packet; } rcv_buffer->last_packet = packet; } sys_mutex_unlock(&lock); } } } arena_release(&read_buff_arena); }