From f27ec95481af4f762a7e5dd53bd08cb9767d31dd Mon Sep 17 00:00:00 2001 From: jacob Date: Mon, 14 Jul 2025 16:42:44 -0500 Subject: [PATCH] remove host thread --- src/host.c | 387 +++++++++++++++++++++-------------------------- src/host.h | 3 - src/sock.h | 6 +- src/sock_win32.c | 90 +++-------- 4 files changed, 192 insertions(+), 294 deletions(-) diff --git a/src/host.c b/src/host.c index b7703784..d68d2f54 100644 --- a/src/host.c +++ b/src/host.c @@ -156,7 +156,6 @@ GLOBAL struct { i32 _; } G = ZI, DEBUG_ALIAS(G, G_host); -INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg); INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma); /* ========================== * @@ -198,19 +197,11 @@ struct host *host_alloc(u16 listen_port) host->sock = sock_alloc(listen_port, MEBI(2), MEBI(2)); - host->receiver_thread = sys_thread_alloc(&host_receiver_thread_entry_point, host, LIT("Host receiver"), PROF_THREAD_GROUP_IO); - return host; } void host_release(struct host *host) { - atomic32_fetch_set(&host->receiver_thread_shutdown_flag.v, 1); - sock_wake(host->sock); - while (!sys_thread_try_release(host->receiver_thread, 0.001f)) { - sock_wake(host->sock); - } - sock_release(host->sock); buddy_ctx_release(host->buddy); @@ -648,180 +639,193 @@ struct host_event_list host_update_begin(struct arena *arena, struct host *host) struct host_event_list events = ZI; i64 now_ns = sys_time_ns(); - { - __profn("Read host packets"); - struct string read_buff = ZI; - read_buff.len = PACKET_DATA_MAX_LEN; - read_buff.text = arena_push_array_no_zero(scratch.arena, u8, read_buff.len); + __profn("Read packets"); - /* Swap read & write rcv buffers */ + /* Read socket */ + struct host_rcv_packet *first_packet = 0; + struct host_rcv_packet *last_packet = 0; { - struct snc_lock lock = snc_lock_e(&host->rcv_buffer_write_mutex); - struct host_rcv_buffer *swp = host->rcv_buffer_read; - host->rcv_buffer_read = host->rcv_buffer_write; - host->rcv_buffer_write = swp; - snc_unlock(&lock); + __profn("Read socket"); + struct sock_array socks = ZI; + socks.socks = &host->sock; + socks.count = 1; + + struct sock *sock = host->sock; + struct sock_read_result res = ZI; + while ((res = sock_read(scratch.arena, sock)).valid) { + struct sock_address address = res.address; + struct string data = res.data; + if (data.len > 0) { + struct host_rcv_packet *packet = arena_push(scratch.arena, struct host_rcv_packet); + packet->address = address; + packet->data = string_copy(scratch.arena, data); + if (last_packet) { + last_packet->next = packet; + } else { + first_packet = packet; + } + last_packet = packet; + } + } } /* Read incoming packets */ - struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_read; - for (struct host_rcv_packet *packet = rcv_buffer->first_packet; packet; packet = packet->next) { - //struct sock *sock = packet->sock; - struct sock_address address = packet->address; - struct bitbuff bb = bitbuff_from_string(packet->data); - struct bitbuff_reader br = br_from_bitbuff(&bb); - u32 magic = br_read_ubits(&br, 32); /* TODO: implicitly encode magic into crc32 */ - if (magic == PACKET_MAGIC) { - /* TODO: Combine kind byte with flags byte */ - struct host_channel *channel = host_channel_from_address(host, address); - enum host_packet_kind host_packet_kind = br_read_ibits(&br, 8); - u8 packet_flags = br_read_ubits(&br, 8); + { + __profn("Process host packets"); + for (struct host_rcv_packet *packet = first_packet; packet; packet = packet->next) { + //struct sock *sock = packet->sock; + struct sock_address address = packet->address; + struct bitbuff bb = bitbuff_from_string(packet->data); + struct bitbuff_reader br = br_from_bitbuff(&bb); + u32 magic = br_read_ubits(&br, 32); /* TODO: implicitly encode magic into crc32 */ + if (magic == PACKET_MAGIC) { + /* TODO: Combine kind byte with flags byte */ + struct host_channel *channel = host_channel_from_address(host, address); + enum host_packet_kind host_packet_kind = br_read_ibits(&br, 8); + u8 packet_flags = br_read_ubits(&br, 8); - u64 their_acked_seq = br_read_uv(&br); - if (channel->valid) { - channel->last_packet_received_ns = now_ns; - if (their_acked_seq > channel->their_acked_seq) { - channel->their_acked_seq = their_acked_seq; - } - } - - b32 skip_packet = 0; - b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE; - if (channel->valid) { - if (is_reliable) { - u64 packet_seq = br_read_uv(&br); - if (packet_seq == channel->our_acked_seq + 1) { - channel->our_acked_seq = packet_seq; - } else { - skip_packet = 1; + u64 their_acked_seq = br_read_uv(&br); + if (channel->valid) { + channel->last_packet_received_ns = now_ns; + if (their_acked_seq > channel->their_acked_seq) { + channel->their_acked_seq = their_acked_seq; } } - } - if (!skip_packet) { - switch (host_packet_kind) { - case HOST_PACKET_KIND_TRY_CONNECT: - { - /* A foreign host is trying to connect to us */ - if (!channel->valid) { - logf_info("Host received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); - /* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */ - channel = host_channel_alloc(host, address); + b32 skip_packet = 0; + b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE; + if (channel->valid) { + if (is_reliable) { + u64 packet_seq = br_read_uv(&br); + if (packet_seq == channel->our_acked_seq + 1) { + channel->our_acked_seq = packet_seq; + } else { + skip_packet = 1; } - struct host_cmd *cmd = host_cmd_alloc_and_append(host); - cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS; - cmd->channel_id = channel->id; - } break; - - case HOST_PACKET_KIND_CONNECT_SUCCESS: - { - /* We successfully connected to a foreign host and they are ready to receive messages */ - if (channel->valid && !channel->connected) { - logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); - struct host_event *event = push_event(arena, &events); - event->kind = HOST_EVENT_KIND_CHANNEL_OPENED; - event->channel_id = channel->id; - channel->connected = 1; - } - } break; - - case HOST_PACKET_KIND_DISCONNECT: - { - /* A foreign host disconnected from us */ - if (channel->valid) { - logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); - struct host_event *event = push_event(arena, &events); - event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED; - event->channel_id = channel->id; - host_channel_release(channel); - } - - } break; - - case HOST_PACKET_KIND_HEARTBEAT: - { - if (channel->valid) { - u16 heartbeat_id = br_read_ubits(&br, 16); - u16 acked_heartbeat_id = br_read_ubits(&br, 16); - if (heartbeat_id > channel->last_heartbeat_received_id) { - channel->last_heartbeat_received_id = heartbeat_id; - } - if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) { - channel->last_heartbeat_acked_id = acked_heartbeat_id; - if (channel->last_heartbeat_acked_ns > 0) { - channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns; - } - channel->last_heartbeat_acked_ns = now_ns; - } - } - } break; - - case HOST_PACKET_KIND_MSG_CHUNK: - { - if (channel->valid && channel->connected) { - /* Packet is chunk out of belonging to message */ - u64 msg_id = br_read_uv(&br); - u64 chunk_id = br_read_uv(&br); - u64 chunk_count = br_read_uv(&br); - b32 is_last_chunk = (chunk_id + 1) == chunk_count; - u64 chunk_len = is_last_chunk ? br_read_uv(&br) : PACKET_MSG_CHUNK_MAX_LEN; - - struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id); - if (!ma) { - ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable); - } - - if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) { - if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) { - u8 *src = br_read_bytes_raw(&br, chunk_len); - if (src) { - u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN]; - MEMCPY(dst, src, chunk_len); - if (is_last_chunk) { - ma->last_chunk_len = chunk_len; - } - host_msg_assembler_set_chunk_received(ma, chunk_id); - ++ma->num_chunks_received; - host_msg_assembler_touch(ma, now_ns); - if (ma->num_chunks_received == chunk_count) { - /* All chunks filled, message has finished assembling */ - /* TODO: Message ordering */ - struct host_event *event = push_event(arena, &events); - struct string data = ZI; - data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len; - data.text = arena_push_array_no_zero(arena, u8, data.len); - MEMCPY(data.text, ma->chunk_data, data.len); - event->kind = HOST_EVENT_KIND_MSG; - event->msg = data; - event->channel_id = channel->id; - if (is_reliable) { - /* Release assembler if reliable */ - host_msg_assembler_release(ma); - } - } - } else { - /* Overflow reading chunk */ - ASSERT(0); - } - } - } else { - /* Chunk id/count mismatch */ - ASSERT(0); - } - } - } break; - - default: break; + } } + + if (!skip_packet) { + switch (host_packet_kind) { + case HOST_PACKET_KIND_TRY_CONNECT: + { + /* A foreign host is trying to connect to us */ + if (!channel->valid) { + logf_info("Host received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); + /* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */ + channel = host_channel_alloc(host, address); + } + struct host_cmd *cmd = host_cmd_alloc_and_append(host); + cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS; + cmd->channel_id = channel->id; + } break; + + case HOST_PACKET_KIND_CONNECT_SUCCESS: + { + /* We successfully connected to a foreign host and they are ready to receive messages */ + if (channel->valid && !channel->connected) { + logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); + struct host_event *event = push_event(arena, &events); + event->kind = HOST_EVENT_KIND_CHANNEL_OPENED; + event->channel_id = channel->id; + channel->connected = 1; + } + } break; + + case HOST_PACKET_KIND_DISCONNECT: + { + /* A foreign host disconnected from us */ + if (channel->valid) { + logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address))); + struct host_event *event = push_event(arena, &events); + event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED; + event->channel_id = channel->id; + host_channel_release(channel); + } + + } break; + + case HOST_PACKET_KIND_HEARTBEAT: + { + if (channel->valid) { + u16 heartbeat_id = br_read_ubits(&br, 16); + u16 acked_heartbeat_id = br_read_ubits(&br, 16); + if (heartbeat_id > channel->last_heartbeat_received_id) { + channel->last_heartbeat_received_id = heartbeat_id; + } + if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) { + channel->last_heartbeat_acked_id = acked_heartbeat_id; + if (channel->last_heartbeat_acked_ns > 0) { + channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns; + } + channel->last_heartbeat_acked_ns = now_ns; + } + } + } break; + + case HOST_PACKET_KIND_MSG_CHUNK: + { + if (channel->valid && channel->connected) { + /* Packet is chunk out of belonging to message */ + u64 msg_id = br_read_uv(&br); + u64 chunk_id = br_read_uv(&br); + u64 chunk_count = br_read_uv(&br); + b32 is_last_chunk = (chunk_id + 1) == chunk_count; + u64 chunk_len = is_last_chunk ? br_read_uv(&br) : PACKET_MSG_CHUNK_MAX_LEN; + + struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id); + if (!ma) { + ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable); + } + + if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) { + if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) { + u8 *src = br_read_bytes_raw(&br, chunk_len); + if (src) { + u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN]; + MEMCPY(dst, src, chunk_len); + if (is_last_chunk) { + ma->last_chunk_len = chunk_len; + } + host_msg_assembler_set_chunk_received(ma, chunk_id); + ++ma->num_chunks_received; + host_msg_assembler_touch(ma, now_ns); + if (ma->num_chunks_received == chunk_count) { + /* All chunks filled, message has finished assembling */ + /* TODO: Message ordering */ + struct host_event *event = push_event(arena, &events); + struct string data = ZI; + data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len; + data.text = arena_push_array_no_zero(arena, u8, data.len); + MEMCPY(data.text, ma->chunk_data, data.len); + event->kind = HOST_EVENT_KIND_MSG; + event->msg = data; + event->channel_id = channel->id; + if (is_reliable) { + /* Release assembler if reliable */ + host_msg_assembler_release(ma); + } + } + } else { + /* Overflow reading chunk */ + ASSERT(0); + } + } + } else { + /* Chunk id/count mismatch */ + ASSERT(0); + } + } + } break; + + default: break; + } + } + host->bytes_received += packet->data.len; } - host->bytes_received += packet->data.len; } } - /* Reset read buffer */ - rcv_buffer->first_packet = 0; - rcv_buffer->last_packet = 0; - arena_reset(rcv_buffer->arena); } /* Update channels */ @@ -1053,50 +1057,3 @@ void host_update_end(struct host *host) scratch_end(scratch); } - -/* ========================== * - * Receive thread - * ========================== */ - -INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg) -{ - u64 read_buff_size = KIBI(64); - struct arena *read_buff_arena = arena_alloc(read_buff_size); - struct string read_buff = ZI; - read_buff.len = read_buff_size; - read_buff.text = arena_push_array_no_zero(read_buff_arena, u8, read_buff_size); - - struct host *host = (struct host *)arg; - - struct sock_array socks = ZI; - socks.socks = &host->sock; - socks.count = 1; - - struct atomic32 *shutdown = &host->receiver_thread_shutdown_flag.v; - while (!atomic32_fetch(shutdown)) { - struct sock *sock = sock_wait_for_available_read(socks, F32_INFINITY); - struct sock_read_result res; - while (!atomic32_fetch(shutdown) && sock && (res = sock_read(sock, read_buff)).valid) { - struct sock_address address = res.address; - struct string data = res.data; - if (data.len > 0) { - struct snc_lock lock = snc_lock_e(&host->rcv_buffer_write_mutex); - { - struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_write; - struct host_rcv_packet *packet = arena_push(rcv_buffer->arena, struct host_rcv_packet); - packet->address = address; - packet->data = string_copy(rcv_buffer->arena, data); - if (rcv_buffer->last_packet) { - rcv_buffer->last_packet->next = packet; - } else { - rcv_buffer->first_packet = packet; - } - rcv_buffer->last_packet = packet; - } - snc_unlock(&lock); - } - } - } - - arena_release(read_buff_arena); -} diff --git a/src/host.h b/src/host.h index c44c239c..2cfd3e57 100644 --- a/src/host.h +++ b/src/host.h @@ -97,9 +97,6 @@ struct host { u64 bytes_received; u64 bytes_sent; - - struct atomic32_padded receiver_thread_shutdown_flag; - struct sys_thread *receiver_thread; }; /* ========================== * diff --git a/src/sock.h b/src/sock.h index 8293d074..a7f09c13 100644 --- a/src/sock.h +++ b/src/sock.h @@ -43,11 +43,7 @@ INLINE b32 sock_address_eq(struct sock_address a, struct sock_address b) struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size); void sock_release(struct sock *sock); -/* Wake anyone blocking on sock read */ -void sock_wake(struct sock *sock); - -struct sock *sock_wait_for_available_read(struct sock_array socks, f32 timeout); -struct sock_read_result sock_read(struct sock *sock, struct string read_buff); +struct sock_read_result sock_read(struct arena *arena, struct sock *sock); void sock_write(struct sock *sock, struct sock_address address, struct string data); #endif diff --git a/src/sock_win32.c b/src/sock_win32.c index 699f0785..ead9bd51 100644 --- a/src/sock_win32.c +++ b/src/sock_win32.c @@ -14,10 +14,6 @@ #pragma comment(lib, "ws2_32.lib") -//#define MAX_IP_STR_LEN 46 - -#define MAX_POLL_FDS 64 - struct win32_address { i32 size; i32 family; @@ -240,6 +236,7 @@ INTERNAL struct win32_address win32_address_from_sock_address(struct sock_addres return res; } +#if 0 /* If supplied address has ip INADDR_ANY (0), convert ip to localhost */ INTERNAL struct win32_address win32_address_convert_any_to_localhost(struct win32_address addr) { @@ -271,6 +268,7 @@ INTERNAL struct win32_address win32_address_convert_any_to_localhost(struct win3 } return addr; } +#endif INTERNAL struct sock_address sock_address_from_win32_address(struct win32_address ws_addr) { @@ -293,7 +291,7 @@ INTERNAL struct sock_address sock_address_from_win32_address(struct win32_addres * Sock * ========================== */ -INTERNAL struct win32_sock *win32_sock_alloc(void) +struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size) { struct win32_sock *ws = 0; { @@ -307,24 +305,9 @@ INTERNAL struct win32_sock *win32_sock_alloc(void) snc_unlock(&lock); } MEMZERO_STRUCT(ws); - return ws; -} - -INTERNAL void win32_sock_release(struct win32_sock *ws) -{ - struct snc_lock lock = snc_lock_e(&G.win32_socks_mutex); - ws->next_free = G.first_free_win32_sock; - G.first_free_win32_sock = ws; - snc_unlock(&lock); -} - -struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size) -{ - struct win32_sock *ws = win32_sock_alloc(); struct sock_address addr = sock_address_from_port(listen_port); struct win32_address bind_address = win32_address_from_sock_address(addr); ws->sock = socket(bind_address.family, SOCK_DGRAM, IPPROTO_UDP); - { i32 sb = sndbuf_size; i32 rb = rcvbuf_size; @@ -333,6 +316,8 @@ struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size) } bind(ws->sock, &bind_address.sa, bind_address.size); + u32 imode = 1; + ioctlsocket(ws->sock, FIONBIO, (unsigned long *)&imode); return (struct sock *)ws; } @@ -341,67 +326,27 @@ void sock_release(struct sock *sock) { struct win32_sock *ws = (struct win32_sock *)sock; closesocket(ws->sock); - win32_sock_release(ws); -} - -/* Send an empty dummy packet to wake anyone blocking on read. - * This is hack since winsock doesn't have eventfd. - * - * TODO: Use WSAEvent and WSAWaitForMultipleEvents instead */ -void sock_wake(struct sock *sock) -{ - struct win32_sock *ws = (struct win32_sock *)sock; - - /* Get bound address as localhost so we can write to it (if bound to INADDR_ANY) */ - struct win32_address bind_address = ZI; + struct snc_lock lock = snc_lock_e(&G.win32_socks_mutex); { - i32 len = sizeof(bind_address.sas); - getsockname(ws->sock, &bind_address.sa, &len); - bind_address.family = bind_address.sin.sin_family; - bind_address.size = len; - bind_address = win32_address_convert_any_to_localhost(bind_address); + ws->next_free = G.first_free_win32_sock; + G.first_free_win32_sock = ws; } - - /* Have sock send an empty dummy packet to itself to signal read available */ - sendto(ws->sock, "", 0, 0, &bind_address.sa, bind_address.size); + snc_unlock(&lock); } /* ========================== * * Read * ========================== */ -struct sock *sock_wait_for_available_read(struct sock_array socks, f32 timeout) -{ - struct sock *res = 0; - - WSAPOLLFD fds[MAX_POLL_FDS] = ZI; - for (u32 i = 0; i < socks.count; ++i) { - struct win32_sock *ws = (struct win32_sock *)socks.socks[i]; - fds[i].fd = ws->sock; - fds[i].events = POLLRDNORM; - } - - i32 timeout_ms; - if (timeout == F32_INFINITY) { - timeout_ms = -1; - } else { - timeout_ms = (i32)(timeout * 1000); - } - WSAPoll(fds, socks.count, timeout_ms); - - for (u64 i = 0; i < socks.count; ++i) { - if (fds[i].revents & POLLRDNORM) { - res = socks.socks[i]; - break; - } - } - - return res; -} - -struct sock_read_result sock_read(struct sock *sock, struct string read_buff) +struct sock_read_result sock_read(struct arena *arena, struct sock *sock) { struct win32_sock *ws = (struct win32_sock *)sock; + + u64 read_buff_size = KIBI(64); + struct string read_buff = ZI; + read_buff.len = read_buff_size; + read_buff.text = arena_push_array_no_zero(arena, u8, read_buff_size); + struct sock_read_result res = ZI; struct win32_address ws_addr = ZI; @@ -416,6 +361,9 @@ struct sock_read_result sock_read(struct sock *sock, struct string read_buff) res.data.text = read_buff.text; res.data.len = size; res.valid = 1; + + /* Pop arena back to end of msg */ + arena_pop_to(arena, arena->pos - read_buff_size + size); } else { #if RTC i32 err = WSAGetLastError();