separate host_update into host_update_begin & host_update_end

This commit is contained in:
jacob 2025-02-26 08:28:50 -06:00
parent 89a4b3b22f
commit f9cbe61b7b
3 changed files with 47 additions and 104 deletions

View File

@ -104,11 +104,6 @@ struct host_channel_lookup_bin {
struct host_channel *last;
};
struct host_queued_event {
struct host_event event;
struct host_queued_event *next;
};
struct host_rcv_packet {
struct sock *sock;
struct sock_address address;
@ -187,7 +182,6 @@ struct host *host_alloc(u16 listen_port)
host->arena = arena;
host->cmd_arena = arena_alloc(GIGABYTE(64));
host->queued_event_arena = arena_alloc(GIGABYTE(64));
host->channel_arena = arena_alloc(GIGABYTE(64));
host->rcv_buffer_read = arena_push_zero(&host->arena, struct host_rcv_buffer);
host->rcv_buffer_write = arena_push_zero(&host->arena, struct host_rcv_buffer);
@ -225,7 +219,6 @@ void host_release(struct host *host)
arena_release(&host->rcv_buffer_write->arena);
arena_release(&host->rcv_buffer_read->arena);
arena_release(&host->channel_arena);
arena_release(&host->queued_event_arena);
arena_release(&host->cmd_arena);
arena_release(&host->arena);
}
@ -638,24 +631,24 @@ i64 host_get_channel_last_rtt_ns(struct host *host, struct host_channel_id chann
* Update
* ========================== */
INTERNAL struct host_queued_event *host_queued_event_alloc_and_append(struct host *host)
INTERNAL struct host_event *alloc_event(struct arena *arena, struct host_event_list *list)
{
struct host_queued_event *qe = arena_push_zero(&host->queued_event_arena, struct host_queued_event);
if (host->last_queued_event) {
host->last_queued_event->next = qe;
struct host_event *event = arena_push_zero(arena, struct host_event);
if (list->last) {
list->last->next = event;
} else {
host->first_queued_event = qe;
list->first = event;
}
host->last_queued_event = qe;
++host->num_queued_events;
return qe;
list->last = event;
return event;
}
void host_update(struct host *host)
/* Read incoming packets, update channels, and return events */
struct host_event_list host_update_begin(struct arena *arena, struct host *host)
{
__prof;
struct temp_arena scratch = scratch_begin_no_conflict();
struct temp_arena scratch = scratch_begin(arena);
struct host_event_list events = ZI;
i64 now_ns = sys_time_ns();
{
@ -672,6 +665,7 @@ void host_update(struct host *host)
host->rcv_buffer_write = swp;
sys_mutex_unlock(&lock);
}
/* Read incoming packets */
struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_read;
for (struct host_rcv_packet *packet = rcv_buffer->first_packet; packet; packet = packet->next) {
@ -727,9 +721,9 @@ void host_update(struct host *host)
/* We successfully connected to a foreign host and they are ready to receive messages */
if (channel->valid && !channel->connected) {
logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_OPENED;
queued_event->event.channel_id = channel->id;
struct host_event *event = alloc_event(arena, &events);
event->kind = HOST_EVENT_KIND_CHANNEL_OPENED;
event->channel_id = channel->id;
channel->connected = true;
}
} break;
@ -739,9 +733,9 @@ void host_update(struct host *host)
/* A foreign host disconnected from us */
if (channel->valid) {
logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
queued_event->event.channel_id = channel->id;
struct host_event *event = alloc_event(arena, &events);
event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
event->channel_id = channel->id;
host_channel_release(channel);
}
@ -795,14 +789,14 @@ void host_update(struct host *host)
if (ma->num_chunks_received == chunk_count) {
/* All chunks filled, message has finished assembling */
/* TODO: Message ordering */
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
struct host_event *event = alloc_event(arena, &events);
struct string data = ZI;
data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len;
data.text = arena_push_array(&host->queued_event_arena, u8, data.len);
data.text = arena_push_array(arena, u8, data.len);
MEMCPY(data.text, ma->chunk_data, data.len);
queued_event->event.kind = HOST_EVENT_KIND_MSG;
queued_event->event.msg = data;
queued_event->event.channel_id = channel->id;
event->kind = HOST_EVENT_KIND_MSG;
event->msg = data;
event->channel_id = channel->id;
if (is_reliable) {
/* Release assembler if reliable */
host_msg_assembler_release(ma);
@ -895,7 +889,17 @@ void host_update(struct host *host)
}
}
/* Process cmds */
scratch_end(scratch);
return events;
}
/* Process host cmds & send outgoing packets */
void host_update_end(struct host *host)
{
__prof;
struct temp_arena scratch = scratch_begin_no_conflict();
/* Process cmds into sendable packets */
/* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */
{
__profscope(host_update_process_cmds);
@ -1052,38 +1056,6 @@ void host_update(struct host *host)
scratch_end(scratch);
}
/* ========================== *
* Events
* ========================== */
struct host_event_array host_pop_events(struct arena *arena, struct host *host)
{
__prof;
struct host_event_array res = ZI;
res.count = host->num_queued_events;
res.events = arena_push_array(arena, struct host_event, res.count);
u64 i = 0;
for (struct host_queued_event *qe = host->first_queued_event; qe; qe = qe->next) {
struct host_event *dest = &res.events[i];
*dest = qe->event;
struct string src_msg = qe->event.msg;
if (src_msg.len > 0) {
dest->msg.text = arena_push_array(arena, u8, src_msg.len);
MEMCPY(dest->msg.text, src_msg.text, src_msg.len);
}
++i;
}
/* Reset queued events */
host->num_queued_events = 0;
host->first_queued_event = NULL;
host->last_queued_event = NULL;
arena_reset(&host->queued_event_arena);
return res;
}
/* ========================== *
* Receive thread
* ========================== */

View File

@ -54,11 +54,13 @@ struct host_event {
enum host_event_kind kind;
struct host_channel_id channel_id;
struct string msg;
struct host_event *next;
};
struct host_event_array {
struct host_event *events;
u64 count;
struct host_event_list {
struct host_event *first;
struct host_event *last;
};
struct host {
@ -73,11 +75,6 @@ struct host {
struct host_cmd *last_cmd;
struct host_cmd *first_free_cmd;
struct arena queued_event_arena;
struct host_queued_event *first_queued_event;
struct host_queued_event *last_queued_event;
u64 num_queued_events;
struct arena channel_arena;
struct host_channel *channels;
struct host_channel *first_free_channel;
@ -134,39 +131,14 @@ void host_queue_write(struct host *host, struct host_channel_id channel_id, stru
* ========================== */
i64 host_get_channel_last_rtt_ns(struct host *host, struct host_channel_id channel_id);
INLINE b32 host_channel_id_eq(struct host_channel_id a, struct host_channel_id b) { return a.idx == b.idx && a.gen == b.gen; }
INLINE b32 host_channel_id_is_nil(struct host_channel_id id) { return id.gen == 0 && id.idx == 0; }
/* ========================== *
* Update
* ========================== */
void host_update(struct host *host);
/* ========================== *
* Events
* ========================== */
struct host_event_array host_pop_events(struct arena *arena, struct host *host);
INLINE b32 host_channel_id_eq(struct host_channel_id a, struct host_channel_id b)
{
return a.idx == b.idx && a.gen == b.gen;
}
INLINE b32 host_channel_id_is_nil(struct host_channel_id id)
{
return id.gen == 0 && id.idx == 0;
}
struct host_event_list host_update_begin(struct arena *arena, struct host *host);
void host_update_end(struct host *host);
#endif

View File

@ -1922,13 +1922,12 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(user_local_sim_thread_entry_point, arg)
last_tick_ns = sys_time_ns();
}
struct host_event_list host_events = host_update_begin(scratch.arena, host);
/* Read net messages */
struct sim_decode_queue queue = ZI;
{
host_update(host);
struct host_event_array host_events = host_pop_events(scratch.arena, host);
for (u64 event_index = 0; event_index < host_events.count; ++event_index) {
struct host_event *event = &host_events.events[event_index];
for (struct host_event *event = host_events.first; event; event = event->next) {
struct host_channel_id channel_id = event->channel_id;
struct sim_client *client = sim_client_from_channel_id(store, channel_id);
switch (event->kind) {
@ -2278,7 +2277,7 @@ INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(user_local_sim_thread_entry_point, arg)
skip_step:
/* Send host messages */
host_update(host);
host_update_end(host);
__profframe("Local sim");
scratch_end(scratch);