#include "arena.h" #include "host.h" #include "scratch.h" #include "bitbuff.h" #include "sys.h" #include "util.h" #include "log.h" #include "buddy.h" #include "atomic.h" /* TODO: * * Rate limiting. * * Resequence buffer to order incoming sequenced packets. * * Rolling window for message reassembly. * This would remove the need for random access message buffers. * * Connection timeouts. * * Challenges to verify receiving address. */ #define PACKET_MAGIC 0xd9e3b8b6 #define PACKET_MSG_CHUNK_MAX_LEN 1024 #define PACKET_DATA_MAX_LEN 1280 /* Give enough space for msg chunk + header */ #define NUM_CHANNEL_LOOKUP_BINS 512 #define NUM_MSG_ASSEMBLER_LOOKUP_BINS 16384 enum host_packet_kind { HOST_PACKET_KIND_NONE, HOST_PACKET_KIND_TRY_CONNECT, HOST_PACKET_KIND_CONNECT_SUCCESS, HOST_PACKET_KIND_DISCONNECT, HOST_PACKET_KIND_HEARTBEAT, HOST_PACKET_KIND_MSG_CHUNK }; enum host_packet_flag { HOST_PACKET_FLAG_NONE = 0, HOST_PACKET_FLAG_RELIABLE = (1 << 0) }; struct host_snd_packet { struct host_snd_packet *next; u64 seq; u64 data_len; u8 data[PACKET_DATA_MAX_LEN]; }; struct host_channel { struct host_channel_id id; b32 valid; b32 connected; struct host *host; struct host_channel *next_free; struct sock_address address; u64 address_hash; struct host_channel *next_address_hash; struct host_channel *prev_address_hash; /* NOTE: Packets are allocated in host's `arena` */ struct host_snd_packet *first_reliable_packet; struct host_snd_packet *last_reliable_packet; struct host_snd_packet *first_unreliable_packet; struct host_snd_packet *last_unreliable_packet; u64 num_reliable_packets; u64 num_unreliable_packets; /* NOTE: Msg assemblers are allocated in host's `arena` */ struct host_msg_assembler *least_recent_msg_assembler; struct host_msg_assembler *most_recent_msg_assembler; u16 last_heartbeat_received_id; u16 last_heartbeat_acked_id; i64 last_heartbeat_acked_ns; i64 last_heartbeat_rtt_ns; u64 last_sent_msg_id; u64 their_acked_seq; u64 our_acked_seq; u64 last_sent_seq; i64 last_packet_received_ns; }; struct host_channel_node { struct host_channel *channel; struct host_channel_node *next; }; struct host_channel_list { struct host_channel_node *first; struct host_channel_node *last; }; struct host_channel_lookup_bin { struct host_channel *first; struct host_channel *last; }; struct host_rcv_packet { struct sock *sock; struct sock_address address; struct string data; struct host_rcv_packet *next; }; struct host_rcv_buffer { struct arena arena; struct host_rcv_packet *first_packet; struct host_rcv_packet *last_packet; }; struct host_msg_assembler { struct host_channel *channel; b32 is_reliable; /* Free list */ struct host_msg_assembler *next_free; /* Bucket list */ struct host_msg_assembler *next_hash; struct host_msg_assembler *prev_hash; /* Channel list */ struct host_msg_assembler *less_recent; struct host_msg_assembler *more_recent; u64 msg_id; u64 hash; u64 last_chunk_len; u64 num_chunks_total; u64 num_chunks_received; i64 touched_ns; struct buddy_block *buddy_block; u8 *chunk_bitmap; u8 *chunk_data; }; struct host_msg_assembler_lookup_bin { struct host_msg_assembler *first; struct host_msg_assembler *last; }; READONLY GLOBAL struct host_channel _g_host_channel_nil = { .valid = false }; GLOBAL struct { i32 _; } G = ZI, DEBUG_ALIAS(G, G_host); INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg); INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma); /* ========================== * * Startup * ========================== */ struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr) { (UNUSED)sock_sr; return (struct host_startup_receipt) { 0 }; } /* ========================== * * Host * ========================== */ struct host *host_alloc(u16 listen_port) { struct arena arena = arena_alloc(GIGABYTE(64)); struct host *host = arena_push_zero(&arena, struct host); host->arena = arena; host->cmd_arena = arena_alloc(GIGABYTE(64)); host->channel_arena = arena_alloc(GIGABYTE(64)); host->rcv_buffer_read = arena_push_zero(&host->arena, struct host_rcv_buffer); host->rcv_buffer_write = arena_push_zero(&host->arena, struct host_rcv_buffer); host->rcv_buffer_read->arena = arena_alloc(GIGABYTE(64)); host->rcv_buffer_write->arena = arena_alloc(GIGABYTE(64)); host->buddy = buddy_ctx_alloc(GIGABYTE(64)); host->channels = arena_dry_push(&host->channel_arena, struct host_channel); host->num_channel_lookup_bins = NUM_CHANNEL_LOOKUP_BINS; host->channel_lookup_bins = arena_push_array_zero(&host->arena, struct host_channel_lookup_bin, host->num_channel_lookup_bins); host->num_msg_assembler_lookup_bins = NUM_MSG_ASSEMBLER_LOOKUP_BINS; host->msg_assembler_lookup_bins = arena_push_array_zero(&host->arena, struct host_msg_assembler_lookup_bin, host->num_msg_assembler_lookup_bins); host->sock = sock_alloc(listen_port, MEGABYTE(2), MEGABYTE(2)); host->rcv_buffer_write_mutex = sys_mutex_alloc(); host->receiver_thread = sys_thread_alloc(&host_receiver_thread_entry_point, host, LIT("[P5] Host receiver")); return host; } void host_release(struct host *host) { atomic_i32_eval_exchange(&host->receiver_thread_shutdown_flag, 1); sock_wake(host->sock); sys_thread_wait_release(&host->receiver_thread); sys_mutex_release(&host->rcv_buffer_write_mutex); sock_release(host->sock); buddy_ctx_release(host->buddy); arena_release(&host->rcv_buffer_write->arena); arena_release(&host->rcv_buffer_read->arena); arena_release(&host->channel_arena); arena_release(&host->cmd_arena); arena_release(&host->arena); } /* ========================== * * Channel * ========================== */ INTERNAL u64 hash_from_address(struct sock_address address) { return hash_fnv64(HASH_FNV64_BASIS, STRING_FROM_STRUCT(&address)); } INTERNAL struct host_channel *host_channel_from_address(struct host *host, struct sock_address address) { u64 hash = hash_from_address(address); struct host_channel_lookup_bin *bin = &host->channel_lookup_bins[hash % host->num_channel_lookup_bins]; for (struct host_channel *channel = bin->first; channel; channel = channel->next_address_hash) { if (channel->address_hash == hash && sock_address_eq(channel->address, address)) { return channel; } } return &_g_host_channel_nil; } /* Returns nil channel if id = HOST_CHANNEL_ID_ALL */ INTERNAL struct host_channel *host_single_channel_from_id(struct host *host, struct host_channel_id channel_id) { if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) { struct host_channel *channel = &host->channels[channel_id.idx]; if (channel->id.gen == channel_id.gen) { return channel; } } return &_g_host_channel_nil; } INTERNAL struct host_channel_list host_channels_from_id(struct arena *arena, struct host *host, struct host_channel_id channel_id) { struct host_channel_list res = ZI; if (host_channel_id_eq(channel_id, HOST_CHANNEL_ID_ALL)) { for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct host_channel *channel = &host->channels[i]; if (channel->valid) { struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node); n->channel = channel; if (res.last) { res.last->next = n; } else { res.first = n; } res.last = n; } } } else { struct host_channel *channel = host_single_channel_from_id(host, channel_id); if (channel->valid) { struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node); n->channel = channel; res.first = n; res.last = n; } } return res; } INTERNAL struct host_channel *host_channel_alloc(struct host *host, struct sock_address address) { struct host_channel_id id = ZI; struct host_channel *channel; if (host->first_free_channel) { channel = host->first_free_channel; host->first_free_channel = channel->next_free; id = channel->id; ++id.gen; } else { channel = arena_push(&host->channel_arena, struct host_channel); id.gen = 1; id.idx = host->num_channels_reserved; ++host->num_channels_reserved; } MEMZERO_STRUCT(channel); channel->valid = true; channel->id = id; channel->host = host; channel->address = address; u64 address_hash = hash_from_address(address); channel->address_hash = address_hash; u64 bin_index = address_hash % host->num_channel_lookup_bins; struct host_channel_lookup_bin *bin = &host->channel_lookup_bins[bin_index]; if (bin->last) { channel->prev_address_hash = bin->last; bin->last->next_address_hash = channel; } else { bin->first = channel; } bin->last = channel; return channel; } INTERNAL void host_channel_release(struct host_channel *channel) { struct host *host = channel->host; /* Release from lookup table */ { struct host_channel_lookup_bin *bin = &host->channel_lookup_bins[channel->address_hash % host->num_channel_lookup_bins]; struct host_channel *prev = channel->prev_address_hash; struct host_channel *next = channel->next_address_hash; if (prev) { prev->next_address_hash = next; } else { bin->first = next; } if (next) { next->prev_address_hash = prev; } else { bin->last = prev; } } /* Release packets */ { if (channel->first_unreliable_packet) { host->first_free_packet = channel->first_unreliable_packet; channel->last_unreliable_packet->next = host->first_free_packet; } if (channel->first_reliable_packet) { host->first_free_packet = channel->first_reliable_packet; channel->last_reliable_packet->next = host->first_free_packet; } } /* Release msg assemblers */ for (struct host_msg_assembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) { host_msg_assembler_release(ma); } ++channel->id.gen; channel->valid = false; channel->next_free = host->first_free_channel; host->first_free_channel = channel; } /* ========================== * * Msg assembler * ========================== */ INTERNAL u64 hash_from_channel_msg(struct host_channel_id channel_id, u64 msg_id) { u64 res = HASH_FNV64_BASIS; res = hash_fnv64(res, STRING_FROM_STRUCT(&channel_id)); res = hash_fnv64(res, STRING_FROM_STRUCT(&msg_id)); return res; } INTERNAL struct host_msg_assembler *host_get_msg_assembler(struct host *host, struct host_channel_id channel_id, u64 msg_id) { u64 hash = hash_from_channel_msg(channel_id, msg_id); struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; for (struct host_msg_assembler *ma = bin->first; ma; ma = ma->next_hash) { if (ma->hash == hash && host_channel_id_eq(ma->channel->id, channel_id) && ma->msg_id == msg_id) { return ma; } } return NULL; } INTERNAL struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable) { struct host *host = channel->host; struct host_msg_assembler *ma; if (host->first_free_msg_assembler) { ma = host->first_free_msg_assembler; host->first_free_msg_assembler = ma->next_free; } else { ma = arena_push(&host->arena, struct host_msg_assembler); } MEMZERO_STRUCT(ma); ma->channel = channel; ma->msg_id = msg_id; ma->num_chunks_total = chunk_count; u64 chunk_bitmap_size = (chunk_count + 7) >> 3; if ((chunk_bitmap_size % 16) != 0) { /* Align chunk bitmap to 16 so msg data is aligned */ chunk_bitmap_size += 16 - (chunk_bitmap_size % 16); } u64 chunk_data_size = chunk_count * PACKET_MSG_CHUNK_MAX_LEN; /* Allocate msg data using buddy allocator since the assembler has * arbitrary lifetime and data needs to stay contiguous for random * access as packets are received */ ma->buddy_block = buddy_alloc(host->buddy, chunk_bitmap_size + chunk_data_size); ma->chunk_bitmap = ma->buddy_block->memory; MEMZERO(ma->chunk_bitmap, chunk_bitmap_size); ma->chunk_data = ma->chunk_bitmap + chunk_bitmap_size; /* FIXME: Ensure chunk_count > 0 */ ma->is_reliable = is_reliable; /* Insert into channel list */ ma->touched_ns = now_ns; if (channel->most_recent_msg_assembler) { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; } else { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; /* Insert into lookup table */ u64 hash = hash_from_channel_msg(channel->id, msg_id); ma->hash = hash; struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins]; if (bin->last) { bin->last->next_hash = ma; ma->prev_hash = bin->last; } else { bin->first = ma; } bin->last = ma; return ma; } INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma) { struct host_channel *channel = ma->channel; struct host *host = channel->host; buddy_release(ma->buddy_block); /* Release from channel list */ { struct host_msg_assembler *prev = ma->less_recent; struct host_msg_assembler *next = ma->more_recent; if (prev) { prev->more_recent = next; } else { channel->least_recent_msg_assembler = next; } if (next) { next->less_recent = prev; } else { channel->most_recent_msg_assembler = prev; } } /* Release from lookup table */ struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[ma->hash % host->num_msg_assembler_lookup_bins]; { struct host_msg_assembler *prev = ma->prev_hash; struct host_msg_assembler *next = ma->next_hash; if (prev) { prev->next_hash = next; } else { bin->first = next; } if (next) { next->prev_hash = prev; } else { bin->last = prev; } } ma->next_free = host->first_free_msg_assembler; host->first_free_msg_assembler = ma; } INTERNAL void host_msg_assembler_touch(struct host_msg_assembler *ma, i64 now_ns) { struct host_channel *channel = ma->channel; if (ma != channel->most_recent_msg_assembler) { /* Remove from channel list */ { struct host_msg_assembler *prev = ma->less_recent; struct host_msg_assembler *next = ma->more_recent; if (prev) { prev->more_recent = next; } else { channel->least_recent_msg_assembler = next; } if (next) { next->less_recent = prev; } else { channel->most_recent_msg_assembler = prev; } } /* Insert at end of channel list */ { if (channel->most_recent_msg_assembler) { channel->most_recent_msg_assembler->more_recent = ma; ma->less_recent = channel->most_recent_msg_assembler; } else { channel->least_recent_msg_assembler = ma; } channel->most_recent_msg_assembler = ma; } } ma->touched_ns = now_ns; } INTERNAL b32 host_msg_assembler_is_chunk_filled(struct host_msg_assembler *ma, u64 chunk_id) { if (chunk_id < ma->num_chunks_total) { return (ma->chunk_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0; } return false; } INTERNAL void host_msg_assembler_set_chunk_received(struct host_msg_assembler *ma, u64 chunk_id) { if (chunk_id < ma->num_chunks_total) { ma->chunk_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8)); } } /* ========================== * * Packet * ========================== */ INTERNAL struct host_snd_packet *host_channel_snd_packet_alloc(struct host_channel *channel, b32 is_reliable) { struct host *host = channel->host; struct host_snd_packet *packet = NULL; if (host->first_free_packet) { packet = host->first_free_packet; host->first_free_packet = packet->next; } else { packet = arena_push(&host->arena, struct host_snd_packet); } MEMZERO_STRUCT(packet); if (is_reliable) { if (channel->last_reliable_packet) { channel->last_reliable_packet->next = packet; } else { channel->first_reliable_packet = packet; } channel->last_reliable_packet = packet; ++channel->num_reliable_packets; packet->seq = ++channel->last_sent_seq; } else { if (channel->last_unreliable_packet) { channel->last_unreliable_packet->next = packet; } else { channel->first_unreliable_packet = packet; } channel->last_unreliable_packet = packet; ++channel->num_unreliable_packets; } return packet; } /* ========================== * * Cmd interface * ========================== */ INTERNAL struct host_cmd *host_cmd_alloc_and_append(struct host *host) { struct host_cmd *cmd = arena_push_zero(&host->cmd_arena, struct host_cmd); if (host->last_cmd) { host->last_cmd->next = cmd; } else { host->first_cmd = cmd; } host->last_cmd = cmd; return cmd; } void host_queue_connect_to_address(struct host *host, struct sock_address connect_address) { struct host_channel *channel = host_channel_from_address(host, connect_address); if (!channel->valid) { channel = host_channel_alloc(host, connect_address); } } void host_queue_disconnect(struct host *host, struct host_channel_id channel_id) { struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_DISCONNECT; cmd->channel_id = channel_id; } void host_queue_write(struct host *host, struct host_channel_id channel_id, struct string msg, u32 flags) { struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_WRITE; cmd->channel_id = channel_id; cmd->write_msg = string_copy(&host->cmd_arena, msg); cmd->write_reliable = flags & HOST_WRITE_FLAG_RELIABLE; } /* ========================== * * Info * ========================== */ i64 host_get_channel_last_rtt_ns(struct host *host, struct host_channel_id channel_id) { struct host_channel *channel = host_single_channel_from_id(host, channel_id); return channel->last_heartbeat_rtt_ns; } /* ========================== * * Update * ========================== */ INTERNAL struct host_event *alloc_event(struct arena *arena, struct host_event_list *list) { struct host_event *event = arena_push_zero(arena, struct host_event); if (list->last) { list->last->next = event; } else { list->first = event; } list->last = event; return event; } /* Read incoming packets, update channels, and return events */ struct host_event_list host_update_begin(struct arena *arena, struct host *host) { struct temp_arena scratch = scratch_begin(arena); struct host_event_list events = ZI; i64 now_ns = sys_time_ns(); { __profscope(host_update_read_packets); struct string read_buff = ZI; read_buff.len = PACKET_DATA_MAX_LEN; read_buff.text = arena_push_array(scratch.arena, u8, read_buff.len); /* Swap read & write rcv buffers */ { struct sys_lock lock = sys_mutex_lock_e(&host->rcv_buffer_write_mutex); struct host_rcv_buffer *swp = host->rcv_buffer_read; host->rcv_buffer_read = host->rcv_buffer_write; host->rcv_buffer_write = swp; sys_mutex_unlock(&lock); } /* Read incoming packets */ struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_read; for (struct host_rcv_packet *packet = rcv_buffer->first_packet; packet; packet = packet->next) { //struct sock *sock = packet->sock; struct sock_address address = packet->address; struct bitbuff bb = bitbuff_from_string(packet->data); struct bitbuff_reader br = br_from_bitbuff(&bb); u32 magic = br_read_ubits(&br, 32); /* TODO: implicitly encode magic into crc32 */ if (magic == PACKET_MAGIC) { /* TODO: Combine kind byte with flags byte */ struct host_channel *channel = host_channel_from_address(host, address); enum host_packet_kind host_packet_kind = br_read_ibits(&br, 8); u8 packet_flags = br_read_ubits(&br, 8); u64 their_acked_seq = br_read_uv(&br); if (channel->valid) { channel->last_packet_received_ns = now_ns; if (their_acked_seq > channel->their_acked_seq) { channel->their_acked_seq = their_acked_seq; } } b32 skip_packet = false; b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE; if (channel->valid) { if (is_reliable) { u64 packet_seq = br_read_uv(&br); if (packet_seq == channel->our_acked_seq + 1) { channel->our_acked_seq = packet_seq; } else { skip_packet = true; } } } if (!skip_packet) { switch (host_packet_kind) { case HOST_PACKET_KIND_TRY_CONNECT: { /* A foreign host is trying to connect to us */ if (!channel->valid) { logf_info("Host received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); /* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */ channel = host_channel_alloc(host, address); } struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS; cmd->channel_id = channel->id; } break; case HOST_PACKET_KIND_CONNECT_SUCCESS: { /* We successfully connected to a foreign host and they are ready to receive messages */ if (channel->valid && !channel->connected) { logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); struct host_event *event = alloc_event(arena, &events); event->kind = HOST_EVENT_KIND_CHANNEL_OPENED; event->channel_id = channel->id; channel->connected = true; } } break; case HOST_PACKET_KIND_DISCONNECT: { /* A foreign host disconnected from us */ if (channel->valid) { logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); struct host_event *event = alloc_event(arena, &events); event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED; event->channel_id = channel->id; host_channel_release(channel); } } break; case HOST_PACKET_KIND_HEARTBEAT: { if (channel->valid) { u16 heartbeat_id = br_read_ubits(&br, 16); u16 acked_heartbeat_id = br_read_ubits(&br, 16); if (heartbeat_id > channel->last_heartbeat_received_id) { channel->last_heartbeat_received_id = heartbeat_id; } if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) { channel->last_heartbeat_acked_id = acked_heartbeat_id; if (channel->last_heartbeat_acked_ns > 0) { channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns; } channel->last_heartbeat_acked_ns = now_ns; } } } break; case HOST_PACKET_KIND_MSG_CHUNK: { if (channel->valid && channel->connected) { /* Packet is chunk out of belonging to message */ u64 msg_id = br_read_uv(&br); u64 chunk_id = br_read_uv(&br); u64 chunk_count = br_read_uv(&br); b32 is_last_chunk = (chunk_id + 1) == chunk_count; u64 chunk_len = is_last_chunk ? br_read_uv(&br) : PACKET_MSG_CHUNK_MAX_LEN; struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id); if (!ma) { ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable); } if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) { if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) { u8 *src = br_read_bytes_raw(&br, chunk_len); if (src) { u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN]; MEMCPY(dst, src, chunk_len); if (is_last_chunk) { ma->last_chunk_len = chunk_len; } host_msg_assembler_set_chunk_received(ma, chunk_id); ++ma->num_chunks_received; host_msg_assembler_touch(ma, now_ns); if (ma->num_chunks_received == chunk_count) { /* All chunks filled, message has finished assembling */ /* TODO: Message ordering */ struct host_event *event = alloc_event(arena, &events); struct string data = ZI; data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len; data.text = arena_push_array(arena, u8, data.len); MEMCPY(data.text, ma->chunk_data, data.len); event->kind = HOST_EVENT_KIND_MSG; event->msg = data; event->channel_id = channel->id; if (is_reliable) { /* Release assembler if reliable */ host_msg_assembler_release(ma); } } } else { /* Overflow reading chunk */ ASSERT(false); } } } else { /* Chunk id/count mismatch */ ASSERT(false); } } } break; default: break; } } host->bytes_received += packet->data.len; } } /* Reset read buffer */ rcv_buffer->first_packet = NULL; rcv_buffer->last_packet = NULL; arena_reset(&rcv_buffer->arena); } /* Update channels */ { __profscope(host_update_channels); for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct host_channel *channel = &host->channels[i]; if (channel->valid) { /* Send / resend handshake if not connected */ if (!channel->connected) { struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_TRY_CONNECT; cmd->channel_id = channel->id; } /* Send heartbeat */ /* TODO: Send this less frequently (once per second or half of timeout or something) */ { struct host_cmd *cmd = host_cmd_alloc_and_append(host); cmd->kind = HOST_CMD_KIND_HEARTBEAT; cmd->heartbeat_id = channel->last_heartbeat_acked_id + 1; cmd->heartbeat_ack_id = channel->last_heartbeat_received_id; cmd->channel_id = channel->id; } /* Release acked reliable packets */ { u64 acked_seq = channel->their_acked_seq; struct host_snd_packet *packet = channel->first_reliable_packet; while (packet) { struct host_snd_packet *next = packet->next; u64 seq = packet->seq; if (seq < acked_seq) { packet->next = host->first_free_packet; host->first_free_packet = packet; channel->first_reliable_packet = next; --channel->num_reliable_packets; } else { break; } packet = next; } if (channel->first_reliable_packet == NULL) { channel->last_reliable_packet = NULL; } } /* Release timed out unreliable msg buffers */ { /* TODO: Configurable timeout */ i64 unreliable_msg_timeout_ns = NS_FROM_SECONDS(0.1); struct host_msg_assembler *ma = channel->least_recent_msg_assembler; while (ma) { struct host_msg_assembler *next = ma->more_recent; if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns) { if (!ma->is_reliable) { host_msg_assembler_release(ma); } } else { break; } ma = next; } } } } } scratch_end(scratch); return events; } /* Process host cmds & send outgoing packets */ void host_update_end(struct host *host) { __prof; struct temp_arena scratch = scratch_begin_no_conflict(); /* Process cmds into sendable packets */ /* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */ { __profscope(host_update_process_cmds); for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) { enum host_cmd_kind kind = cmd->kind; struct host_channel_id channel_id = cmd->channel_id; struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id); for (struct host_channel_node *node = channels.first; node; node = node->next) { struct host_channel *channel = node->channel; switch (kind) { case HOST_CMD_KIND_TRY_CONNECT: { u8 packet_flags = 0; struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, false); struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data)); struct bitbuff_writer bw = bw_from_bitbuff(&bb); bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */ bw_write_ibits(&bw, HOST_PACKET_KIND_TRY_CONNECT, 8); bw_write_ubits(&bw, packet_flags, 8); bw_write_uv(&bw, channel->our_acked_seq); packet->data_len = bw_num_bytes_written(&bw); } break; case HOST_CMD_KIND_CONNECT_SUCCESS: { u8 packet_flags = 0; struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, false); struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data)); struct bitbuff_writer bw = bw_from_bitbuff(&bb); bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */ bw_write_ibits(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS, 8); bw_write_ubits(&bw, packet_flags, 8); bw_write_uv(&bw, channel->our_acked_seq); packet->data_len = bw_num_bytes_written(&bw); } break; case HOST_CMD_KIND_DISCONNECT: { u8 packet_flags = 0; struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, false); struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data)); struct bitbuff_writer bw = bw_from_bitbuff(&bb); bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */ bw_write_ibits(&bw, HOST_PACKET_KIND_DISCONNECT, 8); bw_write_ubits(&bw, packet_flags, 8); bw_write_uv(&bw, channel->our_acked_seq); packet->data_len = bw_num_bytes_written(&bw); } break; case HOST_CMD_KIND_HEARTBEAT: { u8 packet_flags = 0; struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, false); struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data)); struct bitbuff_writer bw = bw_from_bitbuff(&bb); bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */ bw_write_ibits(&bw, HOST_PACKET_KIND_HEARTBEAT, 8); bw_write_ubits(&bw, packet_flags, 8); bw_write_uv(&bw, channel->our_acked_seq); bw_write_ubits(&bw, cmd->heartbeat_id, 16); bw_write_ubits(&bw, cmd->heartbeat_ack_id, 16); packet->data_len = bw_num_bytes_written(&bw); } break; case HOST_CMD_KIND_WRITE: { b32 is_reliable = cmd->write_reliable; u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE); struct string msg = cmd->write_msg; u64 chunk_count = 0; if (msg.len > 0) { chunk_count = (msg.len - 1) / PACKET_MSG_CHUNK_MAX_LEN; } chunk_count += 1; u64 msg_id = ++channel->last_sent_msg_id; for (u64 i = 0; i < chunk_count; ++i) { u64 chunk_len = PACKET_MSG_CHUNK_MAX_LEN; b32 is_last_chunk = i + 1 == chunk_count; if (is_last_chunk) { chunk_len = msg.len % PACKET_MSG_CHUNK_MAX_LEN; if (chunk_len == 0) { chunk_len = PACKET_MSG_CHUNK_MAX_LEN; } } struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, is_reliable); struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data)); struct bitbuff_writer bw = bw_from_bitbuff(&bb); bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */ bw_write_ibits(&bw, HOST_PACKET_KIND_MSG_CHUNK, 8); bw_write_ubits(&bw, packet_flags, 8); bw_write_uv(&bw, channel->our_acked_seq); if (is_reliable) { bw_write_uv(&bw, packet->seq); } bw_write_uv(&bw, msg_id); bw_write_uv(&bw, i); bw_write_uv(&bw, chunk_count); if (is_last_chunk) { bw_write_uv(&bw, chunk_len); } u8 *chunk_data = msg.text + (i * PACKET_MSG_CHUNK_MAX_LEN); bw_write_bytes(&bw, STRING(chunk_len, chunk_data)); packet->data_len = bw_num_bytes_written(&bw); } } break; default: break; } } } } /* Process packets */ /* TODO: Aggregate small packets */ { __profscope(host_update_send_packets); for (u64 i = 0; i < host->num_channels_reserved; ++i) { struct sock *sock = host->sock; struct host_channel *channel = &host->channels[i]; u64 total_sent = 0; if (channel->valid) { struct sock_address address = channel->address; /* Send reliable packets to channel */ for (struct host_snd_packet *packet = channel->first_reliable_packet; packet; packet = packet->next) { sock_write(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Send unreliable packets to channel */ for (struct host_snd_packet *packet = channel->first_unreliable_packet; packet; packet = packet->next) { sock_write(sock, address, STRING(packet->data_len, packet->data)); total_sent += packet->data_len; } /* Release unreliable packets */ if (channel->first_unreliable_packet) { channel->last_unreliable_packet->next = host->first_free_packet; host->first_free_packet = channel->first_unreliable_packet; channel->first_unreliable_packet = NULL; channel->last_unreliable_packet = NULL; channel->num_unreliable_packets = 0; } host->bytes_sent += total_sent; } } } /* Reset cmds */ host->first_cmd = NULL; host->last_cmd = NULL; arena_reset(&host->cmd_arena); scratch_end(scratch); } /* ========================== * * Receive thread * ========================== */ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg) { u64 read_buff_size = KILOBYTE(64); struct arena read_buff_arena = arena_alloc(read_buff_size); struct string read_buff = ZI; read_buff.len = read_buff_size; read_buff.text = arena_push_array(&read_buff_arena, u8, KILOBYTE(64)); struct host *host = (struct host *)arg; struct sock_array socks = ZI; socks.socks = &host->sock; socks.count = 1; struct atomic_i32 *shutdown = &host->receiver_thread_shutdown_flag; while (!atomic_i32_eval(shutdown)) { struct sock *sock = sock_wait_for_available_read(socks, F32_INFINITY); struct sock_read_result res; while (!atomic_i32_eval(shutdown) && sock && (res = sock_read(sock, read_buff)).valid) { struct sock_address address = res.address; struct string data = res.data; if (data.len > 0) { struct sys_lock lock = sys_mutex_lock_e(&host->rcv_buffer_write_mutex); { struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_write; struct host_rcv_packet *packet = arena_push_zero(&rcv_buffer->arena, struct host_rcv_packet); packet->address = address; packet->data = string_copy(&rcv_buffer->arena, data); if (rcv_buffer->last_packet) { rcv_buffer->last_packet->next = packet; } else { rcv_buffer->first_packet = packet; } rcv_buffer->last_packet = packet; } sys_mutex_unlock(&lock); } } } arena_release(&read_buff_arena); }