remove host thread

This commit is contained in:
jacob 2025-07-14 16:42:44 -05:00
parent 0a20e3fdd4
commit f27ec95481
4 changed files with 192 additions and 294 deletions

View File

@ -156,7 +156,6 @@ GLOBAL struct {
i32 _;
} G = ZI, DEBUG_ALIAS(G, G_host);
INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg);
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma);
/* ========================== *
@ -198,19 +197,11 @@ struct host *host_alloc(u16 listen_port)
host->sock = sock_alloc(listen_port, MEBI(2), MEBI(2));
host->receiver_thread = sys_thread_alloc(&host_receiver_thread_entry_point, host, LIT("Host receiver"), PROF_THREAD_GROUP_IO);
return host;
}
void host_release(struct host *host)
{
atomic32_fetch_set(&host->receiver_thread_shutdown_flag.v, 1);
sock_wake(host->sock);
while (!sys_thread_try_release(host->receiver_thread, 0.001f)) {
sock_wake(host->sock);
}
sock_release(host->sock);
buddy_ctx_release(host->buddy);
@ -648,180 +639,193 @@ struct host_event_list host_update_begin(struct arena *arena, struct host *host)
struct host_event_list events = ZI;
i64 now_ns = sys_time_ns();
{
__profn("Read host packets");
struct string read_buff = ZI;
read_buff.len = PACKET_DATA_MAX_LEN;
read_buff.text = arena_push_array_no_zero(scratch.arena, u8, read_buff.len);
__profn("Read packets");
/* Swap read & write rcv buffers */
/* Read socket */
struct host_rcv_packet *first_packet = 0;
struct host_rcv_packet *last_packet = 0;
{
struct snc_lock lock = snc_lock_e(&host->rcv_buffer_write_mutex);
struct host_rcv_buffer *swp = host->rcv_buffer_read;
host->rcv_buffer_read = host->rcv_buffer_write;
host->rcv_buffer_write = swp;
snc_unlock(&lock);
__profn("Read socket");
struct sock_array socks = ZI;
socks.socks = &host->sock;
socks.count = 1;
struct sock *sock = host->sock;
struct sock_read_result res = ZI;
while ((res = sock_read(scratch.arena, sock)).valid) {
struct sock_address address = res.address;
struct string data = res.data;
if (data.len > 0) {
struct host_rcv_packet *packet = arena_push(scratch.arena, struct host_rcv_packet);
packet->address = address;
packet->data = string_copy(scratch.arena, data);
if (last_packet) {
last_packet->next = packet;
} else {
first_packet = packet;
}
last_packet = packet;
}
}
}
/* Read incoming packets */
struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_read;
for (struct host_rcv_packet *packet = rcv_buffer->first_packet; packet; packet = packet->next) {
//struct sock *sock = packet->sock;
struct sock_address address = packet->address;
struct bitbuff bb = bitbuff_from_string(packet->data);
struct bitbuff_reader br = br_from_bitbuff(&bb);
u32 magic = br_read_ubits(&br, 32); /* TODO: implicitly encode magic into crc32 */
if (magic == PACKET_MAGIC) {
/* TODO: Combine kind byte with flags byte */
struct host_channel *channel = host_channel_from_address(host, address);
enum host_packet_kind host_packet_kind = br_read_ibits(&br, 8);
u8 packet_flags = br_read_ubits(&br, 8);
{
__profn("Process host packets");
for (struct host_rcv_packet *packet = first_packet; packet; packet = packet->next) {
//struct sock *sock = packet->sock;
struct sock_address address = packet->address;
struct bitbuff bb = bitbuff_from_string(packet->data);
struct bitbuff_reader br = br_from_bitbuff(&bb);
u32 magic = br_read_ubits(&br, 32); /* TODO: implicitly encode magic into crc32 */
if (magic == PACKET_MAGIC) {
/* TODO: Combine kind byte with flags byte */
struct host_channel *channel = host_channel_from_address(host, address);
enum host_packet_kind host_packet_kind = br_read_ibits(&br, 8);
u8 packet_flags = br_read_ubits(&br, 8);
u64 their_acked_seq = br_read_uv(&br);
if (channel->valid) {
channel->last_packet_received_ns = now_ns;
if (their_acked_seq > channel->their_acked_seq) {
channel->their_acked_seq = their_acked_seq;
}
}
b32 skip_packet = 0;
b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE;
if (channel->valid) {
if (is_reliable) {
u64 packet_seq = br_read_uv(&br);
if (packet_seq == channel->our_acked_seq + 1) {
channel->our_acked_seq = packet_seq;
} else {
skip_packet = 1;
u64 their_acked_seq = br_read_uv(&br);
if (channel->valid) {
channel->last_packet_received_ns = now_ns;
if (their_acked_seq > channel->their_acked_seq) {
channel->their_acked_seq = their_acked_seq;
}
}
}
if (!skip_packet) {
switch (host_packet_kind) {
case HOST_PACKET_KIND_TRY_CONNECT:
{
/* A foreign host is trying to connect to us */
if (!channel->valid) {
logf_info("Host received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
/* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */
channel = host_channel_alloc(host, address);
b32 skip_packet = 0;
b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE;
if (channel->valid) {
if (is_reliable) {
u64 packet_seq = br_read_uv(&br);
if (packet_seq == channel->our_acked_seq + 1) {
channel->our_acked_seq = packet_seq;
} else {
skip_packet = 1;
}
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS;
cmd->channel_id = channel->id;
} break;
case HOST_PACKET_KIND_CONNECT_SUCCESS:
{
/* We successfully connected to a foreign host and they are ready to receive messages */
if (channel->valid && !channel->connected) {
logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_event *event = push_event(arena, &events);
event->kind = HOST_EVENT_KIND_CHANNEL_OPENED;
event->channel_id = channel->id;
channel->connected = 1;
}
} break;
case HOST_PACKET_KIND_DISCONNECT:
{
/* A foreign host disconnected from us */
if (channel->valid) {
logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_event *event = push_event(arena, &events);
event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
event->channel_id = channel->id;
host_channel_release(channel);
}
} break;
case HOST_PACKET_KIND_HEARTBEAT:
{
if (channel->valid) {
u16 heartbeat_id = br_read_ubits(&br, 16);
u16 acked_heartbeat_id = br_read_ubits(&br, 16);
if (heartbeat_id > channel->last_heartbeat_received_id) {
channel->last_heartbeat_received_id = heartbeat_id;
}
if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) {
channel->last_heartbeat_acked_id = acked_heartbeat_id;
if (channel->last_heartbeat_acked_ns > 0) {
channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns;
}
channel->last_heartbeat_acked_ns = now_ns;
}
}
} break;
case HOST_PACKET_KIND_MSG_CHUNK:
{
if (channel->valid && channel->connected) {
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
u64 msg_id = br_read_uv(&br);
u64 chunk_id = br_read_uv(&br);
u64 chunk_count = br_read_uv(&br);
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
u64 chunk_len = is_last_chunk ? br_read_uv(&br) : PACKET_MSG_CHUNK_MAX_LEN;
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
if (!ma) {
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
}
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
u8 *src = br_read_bytes_raw(&br, chunk_len);
if (src) {
u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN];
MEMCPY(dst, src, chunk_len);
if (is_last_chunk) {
ma->last_chunk_len = chunk_len;
}
host_msg_assembler_set_chunk_received(ma, chunk_id);
++ma->num_chunks_received;
host_msg_assembler_touch(ma, now_ns);
if (ma->num_chunks_received == chunk_count) {
/* All chunks filled, message has finished assembling */
/* TODO: Message ordering */
struct host_event *event = push_event(arena, &events);
struct string data = ZI;
data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len;
data.text = arena_push_array_no_zero(arena, u8, data.len);
MEMCPY(data.text, ma->chunk_data, data.len);
event->kind = HOST_EVENT_KIND_MSG;
event->msg = data;
event->channel_id = channel->id;
if (is_reliable) {
/* Release assembler if reliable */
host_msg_assembler_release(ma);
}
}
} else {
/* Overflow reading chunk */
ASSERT(0);
}
}
} else {
/* Chunk id/count mismatch */
ASSERT(0);
}
}
} break;
default: break;
}
}
if (!skip_packet) {
switch (host_packet_kind) {
case HOST_PACKET_KIND_TRY_CONNECT:
{
/* A foreign host is trying to connect to us */
if (!channel->valid) {
logf_info("Host received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
/* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */
channel = host_channel_alloc(host, address);
}
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS;
cmd->channel_id = channel->id;
} break;
case HOST_PACKET_KIND_CONNECT_SUCCESS:
{
/* We successfully connected to a foreign host and they are ready to receive messages */
if (channel->valid && !channel->connected) {
logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_event *event = push_event(arena, &events);
event->kind = HOST_EVENT_KIND_CHANNEL_OPENED;
event->channel_id = channel->id;
channel->connected = 1;
}
} break;
case HOST_PACKET_KIND_DISCONNECT:
{
/* A foreign host disconnected from us */
if (channel->valid) {
logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_event *event = push_event(arena, &events);
event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
event->channel_id = channel->id;
host_channel_release(channel);
}
} break;
case HOST_PACKET_KIND_HEARTBEAT:
{
if (channel->valid) {
u16 heartbeat_id = br_read_ubits(&br, 16);
u16 acked_heartbeat_id = br_read_ubits(&br, 16);
if (heartbeat_id > channel->last_heartbeat_received_id) {
channel->last_heartbeat_received_id = heartbeat_id;
}
if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) {
channel->last_heartbeat_acked_id = acked_heartbeat_id;
if (channel->last_heartbeat_acked_ns > 0) {
channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns;
}
channel->last_heartbeat_acked_ns = now_ns;
}
}
} break;
case HOST_PACKET_KIND_MSG_CHUNK:
{
if (channel->valid && channel->connected) {
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
u64 msg_id = br_read_uv(&br);
u64 chunk_id = br_read_uv(&br);
u64 chunk_count = br_read_uv(&br);
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
u64 chunk_len = is_last_chunk ? br_read_uv(&br) : PACKET_MSG_CHUNK_MAX_LEN;
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
if (!ma) {
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
}
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
u8 *src = br_read_bytes_raw(&br, chunk_len);
if (src) {
u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN];
MEMCPY(dst, src, chunk_len);
if (is_last_chunk) {
ma->last_chunk_len = chunk_len;
}
host_msg_assembler_set_chunk_received(ma, chunk_id);
++ma->num_chunks_received;
host_msg_assembler_touch(ma, now_ns);
if (ma->num_chunks_received == chunk_count) {
/* All chunks filled, message has finished assembling */
/* TODO: Message ordering */
struct host_event *event = push_event(arena, &events);
struct string data = ZI;
data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len;
data.text = arena_push_array_no_zero(arena, u8, data.len);
MEMCPY(data.text, ma->chunk_data, data.len);
event->kind = HOST_EVENT_KIND_MSG;
event->msg = data;
event->channel_id = channel->id;
if (is_reliable) {
/* Release assembler if reliable */
host_msg_assembler_release(ma);
}
}
} else {
/* Overflow reading chunk */
ASSERT(0);
}
}
} else {
/* Chunk id/count mismatch */
ASSERT(0);
}
}
} break;
default: break;
}
}
host->bytes_received += packet->data.len;
}
host->bytes_received += packet->data.len;
}
}
/* Reset read buffer */
rcv_buffer->first_packet = 0;
rcv_buffer->last_packet = 0;
arena_reset(rcv_buffer->arena);
}
/* Update channels */
@ -1053,50 +1057,3 @@ void host_update_end(struct host *host)
scratch_end(scratch);
}
/* ========================== *
* Receive thread
* ========================== */
INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg)
{
u64 read_buff_size = KIBI(64);
struct arena *read_buff_arena = arena_alloc(read_buff_size);
struct string read_buff = ZI;
read_buff.len = read_buff_size;
read_buff.text = arena_push_array_no_zero(read_buff_arena, u8, read_buff_size);
struct host *host = (struct host *)arg;
struct sock_array socks = ZI;
socks.socks = &host->sock;
socks.count = 1;
struct atomic32 *shutdown = &host->receiver_thread_shutdown_flag.v;
while (!atomic32_fetch(shutdown)) {
struct sock *sock = sock_wait_for_available_read(socks, F32_INFINITY);
struct sock_read_result res;
while (!atomic32_fetch(shutdown) && sock && (res = sock_read(sock, read_buff)).valid) {
struct sock_address address = res.address;
struct string data = res.data;
if (data.len > 0) {
struct snc_lock lock = snc_lock_e(&host->rcv_buffer_write_mutex);
{
struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_write;
struct host_rcv_packet *packet = arena_push(rcv_buffer->arena, struct host_rcv_packet);
packet->address = address;
packet->data = string_copy(rcv_buffer->arena, data);
if (rcv_buffer->last_packet) {
rcv_buffer->last_packet->next = packet;
} else {
rcv_buffer->first_packet = packet;
}
rcv_buffer->last_packet = packet;
}
snc_unlock(&lock);
}
}
}
arena_release(read_buff_arena);
}

View File

@ -97,9 +97,6 @@ struct host {
u64 bytes_received;
u64 bytes_sent;
struct atomic32_padded receiver_thread_shutdown_flag;
struct sys_thread *receiver_thread;
};
/* ========================== *

View File

@ -43,11 +43,7 @@ INLINE b32 sock_address_eq(struct sock_address a, struct sock_address b)
struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size);
void sock_release(struct sock *sock);
/* Wake anyone blocking on sock read */
void sock_wake(struct sock *sock);
struct sock *sock_wait_for_available_read(struct sock_array socks, f32 timeout);
struct sock_read_result sock_read(struct sock *sock, struct string read_buff);
struct sock_read_result sock_read(struct arena *arena, struct sock *sock);
void sock_write(struct sock *sock, struct sock_address address, struct string data);
#endif

View File

@ -14,10 +14,6 @@
#pragma comment(lib, "ws2_32.lib")
//#define MAX_IP_STR_LEN 46
#define MAX_POLL_FDS 64
struct win32_address {
i32 size;
i32 family;
@ -240,6 +236,7 @@ INTERNAL struct win32_address win32_address_from_sock_address(struct sock_addres
return res;
}
#if 0
/* If supplied address has ip INADDR_ANY (0), convert ip to localhost */
INTERNAL struct win32_address win32_address_convert_any_to_localhost(struct win32_address addr)
{
@ -271,6 +268,7 @@ INTERNAL struct win32_address win32_address_convert_any_to_localhost(struct win3
}
return addr;
}
#endif
INTERNAL struct sock_address sock_address_from_win32_address(struct win32_address ws_addr)
{
@ -293,7 +291,7 @@ INTERNAL struct sock_address sock_address_from_win32_address(struct win32_addres
* Sock
* ========================== */
INTERNAL struct win32_sock *win32_sock_alloc(void)
struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size)
{
struct win32_sock *ws = 0;
{
@ -307,24 +305,9 @@ INTERNAL struct win32_sock *win32_sock_alloc(void)
snc_unlock(&lock);
}
MEMZERO_STRUCT(ws);
return ws;
}
INTERNAL void win32_sock_release(struct win32_sock *ws)
{
struct snc_lock lock = snc_lock_e(&G.win32_socks_mutex);
ws->next_free = G.first_free_win32_sock;
G.first_free_win32_sock = ws;
snc_unlock(&lock);
}
struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size)
{
struct win32_sock *ws = win32_sock_alloc();
struct sock_address addr = sock_address_from_port(listen_port);
struct win32_address bind_address = win32_address_from_sock_address(addr);
ws->sock = socket(bind_address.family, SOCK_DGRAM, IPPROTO_UDP);
{
i32 sb = sndbuf_size;
i32 rb = rcvbuf_size;
@ -333,6 +316,8 @@ struct sock *sock_alloc(u16 listen_port, u64 sndbuf_size, u64 rcvbuf_size)
}
bind(ws->sock, &bind_address.sa, bind_address.size);
u32 imode = 1;
ioctlsocket(ws->sock, FIONBIO, (unsigned long *)&imode);
return (struct sock *)ws;
}
@ -341,67 +326,27 @@ void sock_release(struct sock *sock)
{
struct win32_sock *ws = (struct win32_sock *)sock;
closesocket(ws->sock);
win32_sock_release(ws);
}
/* Send an empty dummy packet to wake anyone blocking on read.
* This is hack since winsock doesn't have eventfd.
*
* TODO: Use WSAEvent and WSAWaitForMultipleEvents instead */
void sock_wake(struct sock *sock)
{
struct win32_sock *ws = (struct win32_sock *)sock;
/* Get bound address as localhost so we can write to it (if bound to INADDR_ANY) */
struct win32_address bind_address = ZI;
struct snc_lock lock = snc_lock_e(&G.win32_socks_mutex);
{
i32 len = sizeof(bind_address.sas);
getsockname(ws->sock, &bind_address.sa, &len);
bind_address.family = bind_address.sin.sin_family;
bind_address.size = len;
bind_address = win32_address_convert_any_to_localhost(bind_address);
ws->next_free = G.first_free_win32_sock;
G.first_free_win32_sock = ws;
}
/* Have sock send an empty dummy packet to itself to signal read available */
sendto(ws->sock, "", 0, 0, &bind_address.sa, bind_address.size);
snc_unlock(&lock);
}
/* ========================== *
* Read
* ========================== */
struct sock *sock_wait_for_available_read(struct sock_array socks, f32 timeout)
{
struct sock *res = 0;
WSAPOLLFD fds[MAX_POLL_FDS] = ZI;
for (u32 i = 0; i < socks.count; ++i) {
struct win32_sock *ws = (struct win32_sock *)socks.socks[i];
fds[i].fd = ws->sock;
fds[i].events = POLLRDNORM;
}
i32 timeout_ms;
if (timeout == F32_INFINITY) {
timeout_ms = -1;
} else {
timeout_ms = (i32)(timeout * 1000);
}
WSAPoll(fds, socks.count, timeout_ms);
for (u64 i = 0; i < socks.count; ++i) {
if (fds[i].revents & POLLRDNORM) {
res = socks.socks[i];
break;
}
}
return res;
}
struct sock_read_result sock_read(struct sock *sock, struct string read_buff)
struct sock_read_result sock_read(struct arena *arena, struct sock *sock)
{
struct win32_sock *ws = (struct win32_sock *)sock;
u64 read_buff_size = KIBI(64);
struct string read_buff = ZI;
read_buff.len = read_buff_size;
read_buff.text = arena_push_array_no_zero(arena, u8, read_buff_size);
struct sock_read_result res = ZI;
struct win32_address ws_addr = ZI;
@ -416,6 +361,9 @@ struct sock_read_result sock_read(struct sock *sock, struct string read_buff)
res.data.text = read_buff.text;
res.data.len = size;
res.valid = 1;
/* Pop arena back to end of msg */
arena_pop_to(arena, arena->pos - read_buff_size + size);
} else {
#if RTC
i32 err = WSAGetLastError();