power_play/src/host.c

1066 lines
40 KiB
C

#include "arena.h"
#include "host.h"
#include "scratch.h"
#include "bitbuff.h"
#include "sys.h"
#include "util.h"
#include "log.h"
#include "buddy.h"
#include "atomic.h"
/* TODO:
*
* Rate limiting.
*
* Resequence buffer to order incoming sequenced packets.
*
* Rolling window for message reassembly.
* This would remove the need for random access message buffers.
*/
#define PACKET_MAGIC 0xd9e3b8b6
#define PACKET_MSG_CHUNK_MAX_LEN 1024
#define PACKET_DATA_MAX_LEN 1280 /* Give enough space for msg chunk + header */
#define NUM_CHANNEL_LOOKUP_BUCKETS 512
#define NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS 16384
enum host_packet_kind {
HOST_PACKET_KIND_NONE,
HOST_PACKET_KIND_TRY_CONNECT,
HOST_PACKET_KIND_CONNECT_SUCCESS,
HOST_PACKET_KIND_DISCONNECT,
HOST_PACKET_KIND_MSG_CHUNK
};
enum host_packet_flag {
HOST_PACKET_FLAG_NONE = 0,
HOST_PACKET_FLAG_RELIABLE = (1 << 0)
};
struct host_snd_packet {
struct host_snd_packet *next;
u64 seq;
u64 data_len;
u8 data[PACKET_DATA_MAX_LEN];
};
struct host_channel {
struct host_channel_id id;
b32 valid;
b32 connected;
struct host *host;
struct host_channel *next_free;
struct sock_address address;
u64 address_hash;
struct host_channel *next_address_hash;
struct host_channel *prev_address_hash;
/* NOTE: Packets are allocated in host's `arena` */
struct host_snd_packet *first_reliable_packet;
struct host_snd_packet *last_reliable_packet;
struct host_snd_packet *first_unreliable_packet;
struct host_snd_packet *last_unreliable_packet;
u64 num_reliable_packets;
u64 num_unreliable_packets;
/* NOTE: Msg assemblers are allocated in host's `arena` */
struct host_msg_assembler *least_recent_msg_assembler;
struct host_msg_assembler *most_recent_msg_assembler;
u64 last_sent_msg_id;
u64 their_acked_seq;
u64 our_acked_seq;
u64 last_sent_seq;
};
struct host_channel_node {
struct host_channel *channel;
struct host_channel_node *next;
};
struct host_channel_list {
struct host_channel_node *first;
struct host_channel_node *last;
};
struct host_channel_lookup_bucket {
struct host_channel *first;
struct host_channel *last;
};
struct host_queued_event {
struct host_event event;
struct host_queued_event *next;
};
struct host_rcv_packet {
struct sock *sock;
struct sock_address address;
struct string data;
struct host_rcv_packet *next;
};
struct host_rcv_buffer {
struct arena arena;
struct host_rcv_packet *first_packet;
struct host_rcv_packet *last_packet;
};
struct host_msg_assembler {
struct host_channel *channel;
b32 is_reliable;
/* Free list */
struct host_msg_assembler *next_free;
/* Bucket list */
struct host_msg_assembler *next_hash;
struct host_msg_assembler *prev_hash;
/* Channel list */
struct host_msg_assembler *less_recent;
struct host_msg_assembler *more_recent;
u64 msg_id;
u64 hash;
u64 last_chunk_len;
u64 num_chunks_total;
u64 num_chunks_received;
i64 touched_ns;
struct buddy_block *buddy_block;
u8 *chunk_bitmap;
u8 *chunk_data;
};
struct host_msg_assembler_lookup_bucket {
struct host_msg_assembler *first;
struct host_msg_assembler *last;
};
READONLY GLOBAL struct host_channel _g_host_channel_nil = { .valid = false };
GLOBAL struct {
i32 _;
} G = ZI, DEBUG_ALIAS(G, G_host);
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg);
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma);
/* ========================== *
* Startup
* ========================== */
struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr)
{
(UNUSED)sock_sr;
return (struct host_startup_receipt) { 0 };
}
/* ========================== *
* Host
* ========================== */
struct host *host_alloc(u16 listen_port)
{
struct arena arena = arena_alloc(GIGABYTE(64));
struct host *host = arena_push_zero(&arena, struct host);
host->arena = arena;
host->cmd_arena = arena_alloc(GIGABYTE(64));
host->queued_event_arena = arena_alloc(GIGABYTE(64));
host->channel_arena = arena_alloc(GIGABYTE(64));
host->rcv_buffer_read = arena_push_zero(&host->arena, struct host_rcv_buffer);
host->rcv_buffer_write = arena_push_zero(&host->arena, struct host_rcv_buffer);
host->rcv_buffer_read->arena = arena_alloc(GIGABYTE(64));
host->rcv_buffer_write->arena = arena_alloc(GIGABYTE(64));
host->buddy = buddy_ctx_alloc(GIGABYTE(64));
host->channels = arena_dry_push(&host->channel_arena, struct host_channel);
host->num_channel_lookup_buckets = NUM_CHANNEL_LOOKUP_BUCKETS;
host->channel_lookup_buckets = arena_push_array_zero(&host->arena, struct host_channel_lookup_bucket, host->num_channel_lookup_buckets);
host->num_msg_assembler_lookup_buckets = NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS;
host->msg_assembler_lookup_buckets = arena_push_array_zero(&host->arena, struct host_msg_assembler_lookup_bucket, host->num_msg_assembler_lookup_buckets);
host->sock = sock_alloc(listen_port, MEGABYTE(2), MEGABYTE(2));
host->rcv_buffer_write_mutex = sys_mutex_alloc();
host->receiver_thread = sys_thread_alloc(&host_receiver_thread_entry_point, host, LIT("[P5] Host receiver"));
return host;
}
void host_release(struct host *host)
{
atomic_i32_eval_exchange(&host->receiver_thread_shutdown_flag, 1);
sock_wake(host->sock);
sys_thread_wait_release(&host->receiver_thread);
sys_mutex_release(&host->rcv_buffer_write_mutex);
sock_release(host->sock);
buddy_ctx_release(host->buddy);
arena_release(&host->rcv_buffer_write->arena);
arena_release(&host->rcv_buffer_read->arena);
arena_release(&host->channel_arena);
arena_release(&host->queued_event_arena);
arena_release(&host->cmd_arena);
arena_release(&host->arena);
}
/* ========================== *
* Channel
* ========================== */
INTERNAL u64 hash_from_address(struct sock_address address)
{
return hash_fnv64(HASH_FNV64_BASIS, STRING_FROM_STRUCT(&address));
}
INTERNAL struct host_channel *host_channel_from_address(struct host *host, struct sock_address address)
{
u64 hash = hash_from_address(address);
struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[hash % host->num_channel_lookup_buckets];
for (struct host_channel *channel = bucket->first; channel; channel = channel->next_address_hash) {
if (channel->address_hash == hash && sock_address_eq(channel->address, address)) {
return channel;
}
}
return &_g_host_channel_nil;
}
/* Returns nil channel if id = HOST_CHANNEL_ID_ALL */
INTERNAL struct host_channel *host_single_channel_from_id(struct host *host, struct host_channel_id channel_id)
{
if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) {
struct host_channel *channel = &host->channels[channel_id.idx];
if (channel->id.gen == channel_id.gen) {
return channel;
}
}
return &_g_host_channel_nil;
}
INTERNAL struct host_channel_list host_channels_from_id(struct arena *arena, struct host *host, struct host_channel_id channel_id)
{
struct host_channel_list res = ZI;
if (host_channel_id_eq(channel_id, HOST_CHANNEL_ID_ALL)) {
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
struct host_channel *channel = &host->channels[i];
if (channel->valid) {
struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node);
n->channel = channel;
if (res.last) {
res.last->next = n;
} else {
res.first = n;
}
res.last = n;
}
}
} else {
struct host_channel *channel = host_single_channel_from_id(host, channel_id);
if (channel->valid) {
struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node);
n->channel = channel;
res.first = n;
res.last = n;
}
}
return res;
}
INTERNAL struct host_channel *host_channel_alloc(struct host *host, struct sock_address address)
{
struct host_channel_id id = ZI;
struct host_channel *channel;
if (host->first_free_channel) {
channel = host->first_free_channel;
host->first_free_channel = channel->next_free;
id = channel->id;
++id.gen;
} else {
channel = arena_push(&host->channel_arena, struct host_channel);
id.gen = 1;
id.idx = host->num_channels_reserved;
++host->num_channels_reserved;
}
MEMZERO_STRUCT(channel);
channel->valid = true;
channel->id = id;
channel->host = host;
channel->address = address;
u64 address_hash = hash_from_address(address);
channel->address_hash = address_hash;
u64 bucket_index = address_hash % host->num_channel_lookup_buckets;
struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[bucket_index];
if (bucket->last) {
channel->prev_address_hash = bucket->last;
bucket->last->next_address_hash = channel;
} else {
bucket->first = channel;
}
bucket->last = channel;
return channel;
}
INTERNAL void host_channel_release(struct host_channel *channel)
{
struct host *host = channel->host;
/* Release from lookup table */
{
struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[channel->address_hash % host->num_channel_lookup_buckets];
struct host_channel *prev = channel->prev_address_hash;
struct host_channel *next = channel->next_address_hash;
if (prev) {
prev->next_address_hash = next;
} else {
bucket->first = next;
}
if (next) {
next->prev_address_hash = prev;
} else {
bucket->last = prev;
}
}
/* Release packets */
{
if (channel->first_unreliable_packet) {
host->first_free_packet = channel->first_unreliable_packet;
channel->last_unreliable_packet->next = host->first_free_packet;
}
if (channel->first_reliable_packet) {
host->first_free_packet = channel->first_reliable_packet;
channel->last_reliable_packet->next = host->first_free_packet;
}
}
/* Release msg assemblers */
for (struct host_msg_assembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) {
host_msg_assembler_release(ma);
}
++channel->id.gen;
channel->valid = false;
channel->next_free = host->first_free_channel;
host->first_free_channel = channel;
}
/* ========================== *
* Msg assembler
* ========================== */
INTERNAL u64 hash_from_channel_msg(struct host_channel_id channel_id, u64 msg_id)
{
u64 res = HASH_FNV64_BASIS;
res = hash_fnv64(res, STRING_FROM_STRUCT(&channel_id));
res = hash_fnv64(res, STRING_FROM_STRUCT(&msg_id));
return res;
}
INTERNAL struct host_msg_assembler *host_get_msg_assembler(struct host *host, struct host_channel_id channel_id, u64 msg_id)
{
u64 hash = hash_from_channel_msg(channel_id, msg_id);
struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[hash % host->num_msg_assembler_lookup_buckets];
for (struct host_msg_assembler *ma = bucket->first; ma; ma = ma->next_hash) {
if (ma->hash == hash && host_channel_id_eq(ma->channel->id, channel_id) && ma->msg_id == msg_id) {
return ma;
}
}
return NULL;
}
INTERNAL struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable)
{
struct host *host = channel->host;
struct host_msg_assembler *ma;
if (host->first_free_msg_assembler) {
ma = host->first_free_msg_assembler;
host->first_free_msg_assembler = ma->next_free;
} else {
ma = arena_push(&host->arena, struct host_msg_assembler);
}
MEMZERO_STRUCT(ma);
ma->channel = channel;
ma->msg_id = msg_id;
ma->num_chunks_total = chunk_count;
u64 chunk_bitmap_size = (chunk_count + 7) >> 3;
if ((chunk_bitmap_size % 16) != 0) {
/* Align chunk bitmap to 16 so msg data is aligned */
chunk_bitmap_size += 16 - (chunk_bitmap_size % 16);
}
u64 chunk_data_size = chunk_count * PACKET_MSG_CHUNK_MAX_LEN;
/* Allocate msg data using buddy allocator since the assembler has
* arbitrary lifetime and data needs to stay contiguous for random
* access as packets are received */
ma->buddy_block = buddy_alloc(host->buddy, chunk_bitmap_size + chunk_data_size);
ma->chunk_bitmap = ma->buddy_block->memory;
MEMZERO(ma->chunk_bitmap, chunk_bitmap_size);
ma->chunk_data = ma->chunk_bitmap + chunk_bitmap_size;
/* FIXME: Ensure chunk_count > 0 */
ma->is_reliable = is_reliable;
/* Insert into channel list */
ma->touched_ns = now_ns;
if (channel->most_recent_msg_assembler) {
channel->most_recent_msg_assembler->more_recent = ma;
ma->less_recent = channel->most_recent_msg_assembler;
} else {
channel->least_recent_msg_assembler = ma;
}
channel->most_recent_msg_assembler = ma;
/* Insert into lookup table */
u64 hash = hash_from_channel_msg(channel->id, msg_id);
ma->hash = hash;
struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[hash % host->num_msg_assembler_lookup_buckets];
if (bucket->last) {
bucket->last->next_hash = ma;
ma->prev_hash = bucket->last;
} else {
bucket->first = ma;
}
bucket->last = ma;
return ma;
}
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma)
{
struct host_channel *channel = ma->channel;
struct host *host = channel->host;
buddy_release(ma->buddy_block);
/* Release from channel list */
{
struct host_msg_assembler *prev = ma->less_recent;
struct host_msg_assembler *next = ma->more_recent;
if (prev) {
prev->more_recent = next;
} else {
channel->least_recent_msg_assembler = next;
}
if (next) {
next->less_recent = prev;
} else {
channel->most_recent_msg_assembler = prev;
}
}
/* Release from lookup table */
struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[ma->hash % host->num_msg_assembler_lookup_buckets];
{
struct host_msg_assembler *prev = ma->prev_hash;
struct host_msg_assembler *next = ma->next_hash;
if (prev) {
prev->next_hash = next;
} else {
bucket->first = next;
}
if (next) {
next->prev_hash = prev;
} else {
bucket->last = prev;
}
}
ma->next_free = host->first_free_msg_assembler;
host->first_free_msg_assembler = ma;
}
INTERNAL void host_msg_assembler_touch(struct host_msg_assembler *ma, i64 now_ns)
{
struct host_channel *channel = ma->channel;
if (ma != channel->most_recent_msg_assembler) {
/* Remove from channel list */
{
struct host_msg_assembler *prev = ma->less_recent;
struct host_msg_assembler *next = ma->more_recent;
if (prev) {
prev->more_recent = next;
} else {
channel->least_recent_msg_assembler = next;
}
if (next) {
next->less_recent = prev;
} else {
channel->most_recent_msg_assembler = prev;
}
}
/* Insert at end of channel list */
{
if (channel->most_recent_msg_assembler) {
channel->most_recent_msg_assembler->more_recent = ma;
ma->less_recent = channel->most_recent_msg_assembler;
} else {
channel->least_recent_msg_assembler = ma;
}
channel->most_recent_msg_assembler = ma;
}
}
ma->touched_ns = now_ns;
}
INTERNAL b32 host_msg_assembler_is_chunk_filled(struct host_msg_assembler *ma, u64 chunk_id)
{
if (chunk_id < ma->num_chunks_total) {
return (ma->chunk_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0;
}
return false;
}
INTERNAL void host_msg_assembler_set_chunk_received(struct host_msg_assembler *ma, u64 chunk_id)
{
if (chunk_id < ma->num_chunks_total) {
ma->chunk_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8));
}
}
/* ========================== *
* Packet
* ========================== */
INTERNAL struct host_snd_packet *host_channel_snd_packet_alloc(struct host_channel *channel, b32 is_reliable)
{
struct host *host = channel->host;
struct host_snd_packet *packet = NULL;
if (host->first_free_packet) {
packet = host->first_free_packet;
host->first_free_packet = packet->next;
} else {
packet = arena_push(&host->arena, struct host_snd_packet);
}
MEMZERO_STRUCT(packet);
if (is_reliable) {
if (channel->last_reliable_packet) {
channel->last_reliable_packet->next = packet;
} else {
channel->first_reliable_packet = packet;
}
channel->last_reliable_packet = packet;
++channel->num_reliable_packets;
packet->seq = ++channel->last_sent_seq;
} else {
if (channel->last_unreliable_packet) {
channel->last_unreliable_packet->next = packet;
} else {
channel->first_unreliable_packet = packet;
}
channel->last_unreliable_packet = packet;
++channel->num_unreliable_packets;
}
return packet;
}
/* ========================== *
* Cmd interface
* ========================== */
INTERNAL struct host_cmd *host_cmd_alloc_and_append(struct host *host)
{
struct host_cmd *cmd = arena_push_zero(&host->cmd_arena, struct host_cmd);
if (host->last_cmd) {
host->last_cmd->next = cmd;
} else {
host->first_cmd = cmd;
}
host->last_cmd = cmd;
return cmd;
}
void host_queue_connect_to_address(struct host *host, struct sock_address connect_address)
{
struct host_channel *channel = host_channel_from_address(host, connect_address);
if (!channel->valid) {
channel = host_channel_alloc(host, connect_address);
}
}
void host_queue_disconnect(struct host *host, struct host_channel_id channel_id)
{
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
cmd->kind = HOST_CMD_KIND_DISCONNECT;
cmd->channel_id = channel_id;
}
void host_queue_write(struct host *host, struct host_channel_id channel_id, struct string msg, u32 flags)
{
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
cmd->kind = HOST_CMD_KIND_WRITE;
cmd->channel_id = channel_id;
cmd->write_msg = string_copy(&host->cmd_arena, msg);
cmd->write_reliable = flags & HOST_WRITE_FLAG_RELIABLE;
}
/* ========================== *
* Update
* ========================== */
INTERNAL struct host_queued_event *host_queued_event_alloc_and_append(struct host *host)
{
struct host_queued_event *qe = arena_push_zero(&host->queued_event_arena, struct host_queued_event);
if (host->last_queued_event) {
host->last_queued_event->next = qe;
} else {
host->first_queued_event = qe;
}
host->last_queued_event = qe;
++host->num_queued_events;
return qe;
}
void host_update(struct host *host)
{
__prof;
struct temp_arena scratch = scratch_begin_no_conflict();
i64 now_ns = sys_time_ns();
{
__profscope(host_update_read_packets);
struct string read_buff = ZI;
read_buff.len = PACKET_DATA_MAX_LEN;
read_buff.text = arena_push_array(scratch.arena, u8, read_buff.len);
/* Swap read & write rcv buffers */
{
struct sys_lock lock = sys_mutex_lock_e(&host->rcv_buffer_write_mutex);
struct host_rcv_buffer *swp = host->rcv_buffer_read;
host->rcv_buffer_read = host->rcv_buffer_write;
host->rcv_buffer_write = swp;
sys_mutex_unlock(&lock);
}
/* Read incoming packets */
struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_read;
for (struct host_rcv_packet *packet = rcv_buffer->first_packet; packet; packet = packet->next) {
//struct sock *sock = packet->sock;
struct sock_address address = packet->address;
struct bitbuff bb = bitbuff_from_string(packet->data);
struct bitbuff_reader br = br_from_bitbuff(&bb);
u32 magic = br_read_ubits(&br, 32);
if (magic == PACKET_MAGIC) {
/* TODO: Combine kind byte with flags byte */
struct host_channel *channel = host_channel_from_address(host, address);
enum host_packet_kind host_packet_kind = br_read_ibits(&br, 8);
u8 packet_flags = br_read_ubits(&br, 8);
u64 their_acked_seq = br_read_uv(&br);
if (their_acked_seq > channel->their_acked_seq) {
channel->their_acked_seq = their_acked_seq;
}
b32 skip_packet = false;
b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE;
if (channel->valid) {
if (is_reliable) {
u64 packet_seq = br_read_uv(&br);
if (packet_seq == channel->our_acked_seq + 1) {
channel->our_acked_seq = packet_seq;
} else {
skip_packet = true;
}
}
}
if (!skip_packet) {
switch (host_packet_kind) {
case HOST_PACKET_KIND_TRY_CONNECT:
{
/* A foreign host is trying to connect to us */
if (!channel->valid) {
logf_info("Host received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
/* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */
channel = host_channel_alloc(host, address);
}
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS;
cmd->channel_id = channel->id;
} break;
case HOST_PACKET_KIND_CONNECT_SUCCESS:
{
/* We successfully connected to a foreign host and they are ready to receive messages */
if (channel->valid && !channel->connected) {
logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_OPENED;
queued_event->event.channel_id = channel->id;
channel->connected = true;
}
} break;
case HOST_PACKET_KIND_DISCONNECT:
{
/* A foreign host disconnected from us */
if (channel->valid) {
logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
queued_event->event.channel_id = channel->id;
host_channel_release(channel);
}
} break;
case HOST_PACKET_KIND_MSG_CHUNK:
{
if (channel->valid && channel->connected) {
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
u64 msg_id = br_read_uv(&br);
u64 chunk_id = br_read_uv(&br);
u64 chunk_count = br_read_uv(&br);
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
u64 chunk_len = is_last_chunk ? br_read_uv(&br) : PACKET_MSG_CHUNK_MAX_LEN;
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
if (!ma) {
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
}
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
u8 *src = br_read_bytes_raw(&br, chunk_len);
if (src) {
u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN];
MEMCPY(dst, src, chunk_len);
if (is_last_chunk) {
ma->last_chunk_len = chunk_len;
}
host_msg_assembler_set_chunk_received(ma, chunk_id);
++ma->num_chunks_received;
host_msg_assembler_touch(ma, now_ns);
if (ma->num_chunks_received == chunk_count) {
/* All chunks filled, message has finished assembling */
/* TODO: Message ordering */
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
struct string data = ZI;
data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len;
data.text = arena_push_array(&host->queued_event_arena, u8, data.len);
MEMCPY(data.text, ma->chunk_data, data.len);
queued_event->event.kind = HOST_EVENT_KIND_MSG;
queued_event->event.msg = data;
queued_event->event.channel_id = channel->id;
if (is_reliable) {
/* Release assembler if reliable */
host_msg_assembler_release(ma);
}
}
} else {
/* Overflow reading chunk */
ASSERT(false);
}
}
} else {
/* Chunk id/count mismatch */
ASSERT(false);
}
}
} break;
default: break;
}
}
host->bytes_received += packet->data.len;
}
}
/* Reset read buffer */
rcv_buffer->first_packet = NULL;
rcv_buffer->last_packet = NULL;
arena_reset(&rcv_buffer->arena);
}
/* Update channels */
{
__profscope(host_update_channels);
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
struct host_channel *channel = &host->channels[i];
if (channel->valid) {
/* Send / resend handshake if not connected */
if (!channel->connected) {
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
cmd->kind = HOST_CMD_KIND_TRY_CONNECT;
cmd->channel_id = channel->id;
}
/* Release acked reliable packets */
{
u64 acked_seq = channel->their_acked_seq;
struct host_snd_packet *packet = channel->first_reliable_packet;
while (packet) {
struct host_snd_packet *next = packet->next;
u64 seq = packet->seq;
if (seq < acked_seq) {
packet->next = host->first_free_packet;
host->first_free_packet = packet;
channel->first_reliable_packet = next;
--channel->num_reliable_packets;
} else {
break;
}
packet = next;
}
if (channel->first_reliable_packet == NULL) {
channel->last_reliable_packet = NULL;
}
}
/* Release timed out unreliable msg buffers */
{
/* TODO: Configurable timeout */
i64 unreliable_msg_timeout_ns = NS_FROM_SECONDS(0.1);
struct host_msg_assembler *ma = channel->least_recent_msg_assembler;
while (ma) {
struct host_msg_assembler *next = ma->more_recent;
if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns) {
if (!ma->is_reliable) {
host_msg_assembler_release(ma);
}
} else {
break;
}
ma = next;
}
}
}
}
}
/* Process cmds */
/* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */
{
__profscope(host_update_process_cmds);
for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) {
enum host_cmd_kind kind = cmd->kind;
struct host_channel_id channel_id = cmd->channel_id;
struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id);
for (struct host_channel_node *node = channels.first; node; node = node->next) {
struct host_channel *channel = node->channel;
switch (kind) {
case HOST_CMD_KIND_TRY_CONNECT:
{
u8 packet_flags = 0;
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, false);
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
bw_write_ubits(&bw, PACKET_MAGIC, 32);
bw_write_ibits(&bw, HOST_PACKET_KIND_TRY_CONNECT, 8);
bw_write_ubits(&bw, packet_flags, 8);
bw_write_uv(&bw, channel->our_acked_seq);
packet->data_len = bw_num_bytes_written(&bw);
} break;
case HOST_CMD_KIND_CONNECT_SUCCESS:
{
u8 packet_flags = 0;
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, false);
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
bw_write_ubits(&bw, PACKET_MAGIC, 32);
bw_write_ibits(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS, 8);
bw_write_ubits(&bw, packet_flags, 8);
bw_write_uv(&bw, channel->our_acked_seq);
packet->data_len = bw_num_bytes_written(&bw);
} break;
case HOST_CMD_KIND_DISCONNECT:
{
u8 packet_flags = 0;
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, false);
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
bw_write_ubits(&bw, PACKET_MAGIC, 32);
bw_write_ibits(&bw, HOST_PACKET_KIND_DISCONNECT, 8);
bw_write_ubits(&bw, packet_flags, 8);
bw_write_uv(&bw, channel->our_acked_seq);
packet->data_len = bw_num_bytes_written(&bw);
} break;
case HOST_CMD_KIND_WRITE:
{
b32 is_reliable = cmd->write_reliable;
u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE);
struct string msg = cmd->write_msg;
u64 chunk_count = 0;
if (msg.len > 0) {
chunk_count = (msg.len - 1) / PACKET_MSG_CHUNK_MAX_LEN;
}
chunk_count += 1;
u64 msg_id = ++channel->last_sent_msg_id;
for (u64 i = 0; i < chunk_count; ++i) {
u64 chunk_len = PACKET_MSG_CHUNK_MAX_LEN;
b32 is_last_chunk = i + 1 == chunk_count;
if (is_last_chunk) {
chunk_len = msg.len % PACKET_MSG_CHUNK_MAX_LEN;
if (chunk_len == 0) {
chunk_len = PACKET_MSG_CHUNK_MAX_LEN;
}
}
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, is_reliable);
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
bw_write_ubits(&bw, PACKET_MAGIC, 32);
bw_write_ibits(&bw, HOST_PACKET_KIND_MSG_CHUNK, 8);
bw_write_ubits(&bw, packet_flags, 8);
bw_write_uv(&bw, channel->our_acked_seq);
if (is_reliable) {
bw_write_uv(&bw, packet->seq);
}
bw_write_uv(&bw, msg_id);
bw_write_uv(&bw, i);
bw_write_uv(&bw, chunk_count);
if (is_last_chunk) {
bw_write_uv(&bw, chunk_len);
}
u8 *chunk_data = msg.text + (i * PACKET_MSG_CHUNK_MAX_LEN);
bw_write_bytes(&bw, STRING(chunk_len, chunk_data));
packet->data_len = bw_num_bytes_written(&bw);
}
} break;
default: break;
}
}
}
}
/* Process packets */
/* TODO: Aggregate small packets */
{
__profscope(host_update_send_packets);
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
struct sock *sock = host->sock;
struct host_channel *channel = &host->channels[i];
u64 total_sent = 0;
if (channel->valid) {
struct sock_address address = channel->address;
/* Send reliable packets to channel */
for (struct host_snd_packet *packet = channel->first_reliable_packet; packet; packet = packet->next) {
sock_write(sock, address, STRING(packet->data_len, packet->data));
total_sent += packet->data_len;
}
/* Send unreliable packets to channel */
for (struct host_snd_packet *packet = channel->first_unreliable_packet; packet; packet = packet->next) {
sock_write(sock, address, STRING(packet->data_len, packet->data));
total_sent += packet->data_len;
}
/* Release unreliable packets */
if (channel->first_unreliable_packet) {
channel->last_unreliable_packet->next = host->first_free_packet;
host->first_free_packet = channel->first_unreliable_packet;
channel->first_unreliable_packet = NULL;
channel->last_unreliable_packet = NULL;
channel->num_unreliable_packets = 0;
}
host->bytes_sent += total_sent;
}
}
}
/* Reset cmds */
host->first_cmd = NULL;
host->last_cmd = NULL;
arena_reset(&host->cmd_arena);
scratch_end(scratch);
}
/* ========================== *
* Events
* ========================== */
struct host_event_array host_pop_events(struct arena *arena, struct host *host)
{
__prof;
struct host_event_array res = ZI;
res.count = host->num_queued_events;
res.events = arena_push_array(arena, struct host_event, res.count);
u64 i = 0;
for (struct host_queued_event *qe = host->first_queued_event; qe; qe = qe->next) {
struct host_event *dest = &res.events[i];
*dest = qe->event;
struct string src_msg = qe->event.msg;
if (src_msg.len > 0) {
dest->msg.text = arena_push_array(arena, u8, src_msg.len);
MEMCPY(dest->msg.text, src_msg.text, src_msg.len);
}
++i;
}
/* Reset queued events */
host->num_queued_events = 0;
host->first_queued_event = NULL;
host->last_queued_event = NULL;
arena_reset(&host->queued_event_arena);
return res;
}
/* ========================== *
* Receive thread
* ========================== */
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg)
{
u64 read_buff_size = KILOBYTE(64);
struct arena read_buff_arena = arena_alloc(read_buff_size);
struct string read_buff = ZI;
read_buff.len = read_buff_size;
read_buff.text = arena_push_array(&read_buff_arena, u8, KILOBYTE(64));
struct host *host = (struct host *)arg;
struct sock_array socks = ZI;
socks.socks = &host->sock;
socks.count = 1;
struct atomic_i32 *shutdown = &host->receiver_thread_shutdown_flag;
while (!atomic_i32_eval(shutdown)) {
struct sock *sock = sock_wait_for_available_read(socks, F32_INFINITY);
struct sock_read_result res;
while (!atomic_i32_eval(shutdown) && sock && (res = sock_read(sock, read_buff)).valid) {
struct sock_address address = res.address;
struct string data = res.data;
if (data.len > 0) {
struct sys_lock lock = sys_mutex_lock_e(&host->rcv_buffer_write_mutex);
{
struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_write;
struct host_rcv_packet *packet = arena_push_zero(&rcv_buffer->arena, struct host_rcv_packet);
packet->address = address;
packet->data = string_copy(&rcv_buffer->arena, data);
if (rcv_buffer->last_packet) {
rcv_buffer->last_packet->next = packet;
} else {
rcv_buffer->first_packet = packet;
}
rcv_buffer->last_packet = packet;
}
sys_mutex_unlock(&lock);
}
}
}
arena_release(&read_buff_arena);
}