1103 lines
42 KiB
C
1103 lines
42 KiB
C
#include "host.h"
|
|
#include "arena.h"
|
|
#include "bitbuff.h"
|
|
#include "sys.h"
|
|
#include "util.h"
|
|
#include "log.h"
|
|
#include "buddy.h"
|
|
#include "atomic.h"
|
|
|
|
/* TODO:
|
|
*
|
|
* Rate limiting.
|
|
*
|
|
* Resequence buffer to order incoming sequenced packets.
|
|
*
|
|
* Rolling window for message reassembly.
|
|
* This would remove the need for random access message buffers.
|
|
*
|
|
* Connection timeouts.
|
|
*
|
|
* Challenges to verify receiving address.
|
|
*/
|
|
|
|
#define PACKET_MAGIC 0xd9e3b8b6
|
|
#define PACKET_MSG_CHUNK_MAX_LEN 1024
|
|
#define PACKET_DATA_MAX_LEN 1280 /* Give enough space for msg chunk + header */
|
|
|
|
#define NUM_CHANNEL_LOOKUP_BINS 512
|
|
#define NUM_MSG_ASSEMBLER_LOOKUP_BINS 16384
|
|
|
|
enum host_packet_kind {
|
|
HOST_PACKET_KIND_NONE,
|
|
HOST_PACKET_KIND_TRY_CONNECT,
|
|
HOST_PACKET_KIND_CONNECT_SUCCESS,
|
|
HOST_PACKET_KIND_DISCONNECT,
|
|
HOST_PACKET_KIND_HEARTBEAT,
|
|
HOST_PACKET_KIND_MSG_CHUNK
|
|
};
|
|
|
|
enum host_packet_flag {
|
|
HOST_PACKET_FLAG_NONE = 0,
|
|
HOST_PACKET_FLAG_RELIABLE = (1 << 0)
|
|
};
|
|
|
|
struct host_snd_packet {
|
|
struct host_snd_packet *next;
|
|
u64 seq;
|
|
|
|
u64 data_len;
|
|
u8 data[PACKET_DATA_MAX_LEN];
|
|
};
|
|
|
|
struct host_channel {
|
|
struct host_channel_id id;
|
|
b32 valid;
|
|
b32 connected;
|
|
struct host *host;
|
|
|
|
struct host_channel *next_free;
|
|
|
|
struct sock_address address;
|
|
u64 address_hash;
|
|
struct host_channel *next_address_hash;
|
|
struct host_channel *prev_address_hash;
|
|
|
|
/* NOTE: Packets are allocated in host's `arena` */
|
|
struct host_snd_packet *first_reliable_packet;
|
|
struct host_snd_packet *last_reliable_packet;
|
|
struct host_snd_packet *first_unreliable_packet;
|
|
struct host_snd_packet *last_unreliable_packet;
|
|
u64 num_reliable_packets;
|
|
u64 num_unreliable_packets;
|
|
|
|
/* NOTE: Msg assemblers are allocated in host's `arena` */
|
|
struct host_msg_assembler *least_recent_msg_assembler;
|
|
struct host_msg_assembler *most_recent_msg_assembler;
|
|
|
|
u16 last_heartbeat_received_id;
|
|
u16 last_heartbeat_acked_id;
|
|
i64 last_heartbeat_acked_ns;
|
|
i64 last_heartbeat_rtt_ns;
|
|
|
|
u64 last_sent_msg_id;
|
|
u64 their_acked_seq;
|
|
u64 our_acked_seq;
|
|
u64 last_sent_seq;
|
|
|
|
i64 last_packet_received_ns;
|
|
};
|
|
|
|
struct host_channel_node {
|
|
struct host_channel *channel;
|
|
struct host_channel_node *next;
|
|
};
|
|
|
|
struct host_channel_list {
|
|
struct host_channel_node *first;
|
|
struct host_channel_node *last;
|
|
};
|
|
|
|
struct host_channel_lookup_bin {
|
|
struct host_channel *first;
|
|
struct host_channel *last;
|
|
};
|
|
|
|
struct host_rcv_packet {
|
|
struct sock *sock;
|
|
struct sock_address address;
|
|
struct string data;
|
|
struct host_rcv_packet *next;
|
|
};
|
|
|
|
struct host_rcv_buffer {
|
|
struct arena *arena;
|
|
struct host_rcv_packet *first_packet;
|
|
struct host_rcv_packet *last_packet;
|
|
};
|
|
|
|
struct host_msg_assembler {
|
|
struct host_channel *channel;
|
|
b32 is_reliable;
|
|
|
|
/* Free list */
|
|
struct host_msg_assembler *next_free;
|
|
|
|
/* Bucket list */
|
|
struct host_msg_assembler *next_hash;
|
|
struct host_msg_assembler *prev_hash;
|
|
|
|
/* Channel list */
|
|
struct host_msg_assembler *less_recent;
|
|
struct host_msg_assembler *more_recent;
|
|
|
|
u64 msg_id;
|
|
u64 hash;
|
|
|
|
u64 last_chunk_len;
|
|
u64 num_chunks_total;
|
|
u64 num_chunks_received;
|
|
|
|
i64 touched_ns;
|
|
|
|
struct buddy_block *buddy_block;
|
|
u8 *chunk_bitmap;
|
|
u8 *chunk_data;
|
|
};
|
|
|
|
struct host_msg_assembler_lookup_bin {
|
|
struct host_msg_assembler *first;
|
|
struct host_msg_assembler *last;
|
|
};
|
|
|
|
READONLY GLOBAL struct host_channel _g_host_channel_nil = { .valid = 0 };
|
|
|
|
GLOBAL struct {
|
|
i32 _;
|
|
} G = ZI, DEBUG_ALIAS(G, G_host);
|
|
|
|
INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg);
|
|
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma);
|
|
|
|
/* ========================== *
|
|
* Startup
|
|
* ========================== */
|
|
|
|
struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr)
|
|
{
|
|
__prof;
|
|
(UNUSED)sock_sr;
|
|
return (struct host_startup_receipt) { 0 };
|
|
}
|
|
|
|
/* ========================== *
|
|
* Host
|
|
* ========================== */
|
|
|
|
struct host *host_alloc(u16 listen_port)
|
|
{
|
|
struct arena *arena = arena_alloc(GIBI(64));
|
|
struct host *host = arena_push(arena, struct host);
|
|
|
|
host->arena = arena;
|
|
host->cmd_arena = arena_alloc(GIBI(64));
|
|
host->channel_arena = arena_alloc(GIBI(64));
|
|
host->rcv_buffer_read = arena_push(host->arena, struct host_rcv_buffer);
|
|
host->rcv_buffer_write = arena_push(host->arena, struct host_rcv_buffer);
|
|
host->rcv_buffer_read->arena = arena_alloc(GIBI(64));
|
|
host->rcv_buffer_write->arena = arena_alloc(GIBI(64));
|
|
host->buddy = buddy_ctx_alloc(GIBI(64));
|
|
|
|
host->channels = arena_push_dry(host->channel_arena, struct host_channel);
|
|
|
|
host->num_channel_lookup_bins = NUM_CHANNEL_LOOKUP_BINS;
|
|
host->channel_lookup_bins = arena_push_array(host->arena, struct host_channel_lookup_bin, host->num_channel_lookup_bins);
|
|
|
|
host->num_msg_assembler_lookup_bins = NUM_MSG_ASSEMBLER_LOOKUP_BINS;
|
|
host->msg_assembler_lookup_bins = arena_push_array(host->arena, struct host_msg_assembler_lookup_bin, host->num_msg_assembler_lookup_bins);
|
|
|
|
host->sock = sock_alloc(listen_port, MEBI(2), MEBI(2));
|
|
|
|
host->receiver_thread = sys_thread_alloc(&host_receiver_thread_entry_point, host, LIT("Host receiver"), PROF_THREAD_GROUP_IO);
|
|
|
|
return host;
|
|
}
|
|
|
|
void host_release(struct host *host)
|
|
{
|
|
atomic32_fetch_set(&host->receiver_thread_shutdown_flag.v, 1);
|
|
sock_wake(host->sock);
|
|
while (!sys_thread_try_release(host->receiver_thread, 0.001f)) {
|
|
sock_wake(host->sock);
|
|
}
|
|
|
|
sock_release(host->sock);
|
|
|
|
buddy_ctx_release(host->buddy);
|
|
arena_release(host->rcv_buffer_write->arena);
|
|
arena_release(host->rcv_buffer_read->arena);
|
|
arena_release(host->channel_arena);
|
|
arena_release(host->cmd_arena);
|
|
arena_release(host->arena);
|
|
}
|
|
|
|
/* ========================== *
|
|
* Channel
|
|
* ========================== */
|
|
|
|
INTERNAL u64 hash_from_address(struct sock_address address)
|
|
{
|
|
return hash_fnv64(HASH_FNV64_BASIS, STRING_FROM_STRUCT(&address));
|
|
}
|
|
|
|
INTERNAL struct host_channel *host_channel_from_address(struct host *host, struct sock_address address)
|
|
{
|
|
u64 hash = hash_from_address(address);
|
|
struct host_channel_lookup_bin *bin = &host->channel_lookup_bins[hash % host->num_channel_lookup_bins];
|
|
for (struct host_channel *channel = bin->first; channel; channel = channel->next_address_hash) {
|
|
if (channel->address_hash == hash && sock_address_eq(channel->address, address)) {
|
|
return channel;
|
|
}
|
|
}
|
|
return &_g_host_channel_nil;
|
|
}
|
|
|
|
/* Returns nil channel if id = HOST_CHANNEL_ID_ALL */
|
|
INTERNAL struct host_channel *host_single_channel_from_id(struct host *host, struct host_channel_id channel_id)
|
|
{
|
|
if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) {
|
|
struct host_channel *channel = &host->channels[channel_id.idx];
|
|
if (channel->id.gen == channel_id.gen) {
|
|
return channel;
|
|
}
|
|
}
|
|
return &_g_host_channel_nil;
|
|
}
|
|
|
|
INTERNAL struct host_channel_list host_channels_from_id(struct arena *arena, struct host *host, struct host_channel_id channel_id)
|
|
{
|
|
struct host_channel_list res = ZI;
|
|
if (host_channel_id_eq(channel_id, HOST_CHANNEL_ID_ALL)) {
|
|
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
|
|
struct host_channel *channel = &host->channels[i];
|
|
if (channel->valid) {
|
|
struct host_channel_node *n = arena_push(arena, struct host_channel_node);
|
|
n->channel = channel;
|
|
if (res.last) {
|
|
res.last->next = n;
|
|
} else {
|
|
res.first = n;
|
|
}
|
|
res.last = n;
|
|
}
|
|
}
|
|
} else {
|
|
struct host_channel *channel = host_single_channel_from_id(host, channel_id);
|
|
if (channel->valid) {
|
|
struct host_channel_node *n = arena_push(arena, struct host_channel_node);
|
|
n->channel = channel;
|
|
res.first = n;
|
|
res.last = n;
|
|
}
|
|
}
|
|
return res;
|
|
}
|
|
|
|
INTERNAL struct host_channel *host_channel_alloc(struct host *host, struct sock_address address)
|
|
{
|
|
struct host_channel_id id = ZI;
|
|
struct host_channel *channel;
|
|
if (host->first_free_channel) {
|
|
channel = host->first_free_channel;
|
|
host->first_free_channel = channel->next_free;
|
|
id = channel->id;
|
|
++id.gen;
|
|
} else {
|
|
channel = arena_push_no_zero(host->channel_arena, struct host_channel);
|
|
id.gen = 1;
|
|
id.idx = host->num_channels_reserved;
|
|
++host->num_channels_reserved;
|
|
}
|
|
MEMZERO_STRUCT(channel);
|
|
channel->valid = 1;
|
|
channel->id = id;
|
|
channel->host = host;
|
|
channel->address = address;
|
|
u64 address_hash = hash_from_address(address);
|
|
channel->address_hash = address_hash;
|
|
|
|
u64 bin_index = address_hash % host->num_channel_lookup_bins;
|
|
struct host_channel_lookup_bin *bin = &host->channel_lookup_bins[bin_index];
|
|
if (bin->last) {
|
|
channel->prev_address_hash = bin->last;
|
|
bin->last->next_address_hash = channel;
|
|
} else {
|
|
bin->first = channel;
|
|
}
|
|
bin->last = channel;
|
|
|
|
return channel;
|
|
}
|
|
|
|
INTERNAL void host_channel_release(struct host_channel *channel)
|
|
{
|
|
struct host *host = channel->host;
|
|
|
|
/* Release from lookup table */
|
|
{
|
|
struct host_channel_lookup_bin *bin = &host->channel_lookup_bins[channel->address_hash % host->num_channel_lookup_bins];
|
|
struct host_channel *prev = channel->prev_address_hash;
|
|
struct host_channel *next = channel->next_address_hash;
|
|
if (prev) {
|
|
prev->next_address_hash = next;
|
|
} else {
|
|
bin->first = next;
|
|
}
|
|
if (next) {
|
|
next->prev_address_hash = prev;
|
|
} else {
|
|
bin->last = prev;
|
|
}
|
|
}
|
|
|
|
/* Release packets */
|
|
{
|
|
if (channel->first_unreliable_packet) {
|
|
host->first_free_packet = channel->first_unreliable_packet;
|
|
channel->last_unreliable_packet->next = host->first_free_packet;
|
|
}
|
|
if (channel->first_reliable_packet) {
|
|
host->first_free_packet = channel->first_reliable_packet;
|
|
channel->last_reliable_packet->next = host->first_free_packet;
|
|
}
|
|
}
|
|
|
|
/* Release msg assemblers */
|
|
for (struct host_msg_assembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) {
|
|
host_msg_assembler_release(ma);
|
|
}
|
|
|
|
++channel->id.gen;
|
|
channel->valid = 0;
|
|
channel->next_free = host->first_free_channel;
|
|
host->first_free_channel = channel;
|
|
}
|
|
|
|
/* ========================== *
|
|
* Msg assembler
|
|
* ========================== */
|
|
|
|
INTERNAL u64 hash_from_channel_msg(struct host_channel_id channel_id, u64 msg_id)
|
|
{
|
|
u64 res = HASH_FNV64_BASIS;
|
|
res = hash_fnv64(res, STRING_FROM_STRUCT(&channel_id));
|
|
res = hash_fnv64(res, STRING_FROM_STRUCT(&msg_id));
|
|
return res;
|
|
}
|
|
|
|
INTERNAL struct host_msg_assembler *host_get_msg_assembler(struct host *host, struct host_channel_id channel_id, u64 msg_id)
|
|
{
|
|
u64 hash = hash_from_channel_msg(channel_id, msg_id);
|
|
struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins];
|
|
for (struct host_msg_assembler *ma = bin->first; ma; ma = ma->next_hash) {
|
|
if (ma->hash == hash && host_channel_id_eq(ma->channel->id, channel_id) && ma->msg_id == msg_id) {
|
|
return ma;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
INTERNAL struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns, b32 is_reliable)
|
|
{
|
|
struct host *host = channel->host;
|
|
struct host_msg_assembler *ma;
|
|
if (host->first_free_msg_assembler) {
|
|
ma = host->first_free_msg_assembler;
|
|
host->first_free_msg_assembler = ma->next_free;
|
|
} else {
|
|
ma = arena_push_no_zero(host->arena, struct host_msg_assembler);
|
|
}
|
|
MEMZERO_STRUCT(ma);
|
|
ma->channel = channel;
|
|
ma->msg_id = msg_id;
|
|
|
|
ma->num_chunks_total = chunk_count;
|
|
|
|
u64 chunk_bitmap_size = (chunk_count + 7) >> 3;
|
|
if ((chunk_bitmap_size % 16) != 0) {
|
|
/* Align chunk bitmap to 16 so msg data is aligned */
|
|
chunk_bitmap_size += 16 - (chunk_bitmap_size % 16);
|
|
}
|
|
u64 chunk_data_size = chunk_count * PACKET_MSG_CHUNK_MAX_LEN;
|
|
|
|
/* Allocate msg data using buddy allocator since the assembler has
|
|
* arbitrary lifetime and data needs to stay contiguous for random
|
|
* access as packets are received */
|
|
ma->buddy_block = buddy_alloc(host->buddy, chunk_bitmap_size + chunk_data_size);
|
|
ma->chunk_bitmap = ma->buddy_block->memory;
|
|
MEMZERO(ma->chunk_bitmap, chunk_bitmap_size);
|
|
ma->chunk_data = ma->chunk_bitmap + chunk_bitmap_size;
|
|
|
|
/* FIXME: Ensure chunk_count > 0 */
|
|
ma->is_reliable = is_reliable;
|
|
|
|
/* Insert into channel list */
|
|
ma->touched_ns = now_ns;
|
|
if (channel->most_recent_msg_assembler) {
|
|
channel->most_recent_msg_assembler->more_recent = ma;
|
|
ma->less_recent = channel->most_recent_msg_assembler;
|
|
} else {
|
|
channel->least_recent_msg_assembler = ma;
|
|
}
|
|
channel->most_recent_msg_assembler = ma;
|
|
|
|
/* Insert into lookup table */
|
|
u64 hash = hash_from_channel_msg(channel->id, msg_id);
|
|
ma->hash = hash;
|
|
struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[hash % host->num_msg_assembler_lookup_bins];
|
|
if (bin->last) {
|
|
bin->last->next_hash = ma;
|
|
ma->prev_hash = bin->last;
|
|
} else {
|
|
bin->first = ma;
|
|
}
|
|
bin->last = ma;
|
|
|
|
return ma;
|
|
}
|
|
|
|
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma)
|
|
{
|
|
struct host_channel *channel = ma->channel;
|
|
struct host *host = channel->host;
|
|
buddy_release(ma->buddy_block);
|
|
|
|
/* Release from channel list */
|
|
{
|
|
struct host_msg_assembler *prev = ma->less_recent;
|
|
struct host_msg_assembler *next = ma->more_recent;
|
|
if (prev) {
|
|
prev->more_recent = next;
|
|
} else {
|
|
channel->least_recent_msg_assembler = next;
|
|
}
|
|
if (next) {
|
|
next->less_recent = prev;
|
|
} else {
|
|
channel->most_recent_msg_assembler = prev;
|
|
}
|
|
}
|
|
|
|
/* Release from lookup table */
|
|
struct host_msg_assembler_lookup_bin *bin = &host->msg_assembler_lookup_bins[ma->hash % host->num_msg_assembler_lookup_bins];
|
|
{
|
|
struct host_msg_assembler *prev = ma->prev_hash;
|
|
struct host_msg_assembler *next = ma->next_hash;
|
|
if (prev) {
|
|
prev->next_hash = next;
|
|
} else {
|
|
bin->first = next;
|
|
}
|
|
if (next) {
|
|
next->prev_hash = prev;
|
|
} else {
|
|
bin->last = prev;
|
|
}
|
|
}
|
|
|
|
ma->next_free = host->first_free_msg_assembler;
|
|
host->first_free_msg_assembler = ma;
|
|
}
|
|
|
|
INTERNAL void host_msg_assembler_touch(struct host_msg_assembler *ma, i64 now_ns)
|
|
{
|
|
struct host_channel *channel = ma->channel;
|
|
if (ma != channel->most_recent_msg_assembler) {
|
|
/* Remove from channel list */
|
|
{
|
|
struct host_msg_assembler *prev = ma->less_recent;
|
|
struct host_msg_assembler *next = ma->more_recent;
|
|
if (prev) {
|
|
prev->more_recent = next;
|
|
} else {
|
|
channel->least_recent_msg_assembler = next;
|
|
}
|
|
if (next) {
|
|
next->less_recent = prev;
|
|
} else {
|
|
channel->most_recent_msg_assembler = prev;
|
|
}
|
|
}
|
|
|
|
/* Insert at end of channel list */
|
|
{
|
|
if (channel->most_recent_msg_assembler) {
|
|
channel->most_recent_msg_assembler->more_recent = ma;
|
|
ma->less_recent = channel->most_recent_msg_assembler;
|
|
} else {
|
|
channel->least_recent_msg_assembler = ma;
|
|
}
|
|
channel->most_recent_msg_assembler = ma;
|
|
}
|
|
}
|
|
ma->touched_ns = now_ns;
|
|
}
|
|
|
|
INTERNAL b32 host_msg_assembler_is_chunk_filled(struct host_msg_assembler *ma, u64 chunk_id)
|
|
{
|
|
if (chunk_id < ma->num_chunks_total) {
|
|
return (ma->chunk_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
INTERNAL void host_msg_assembler_set_chunk_received(struct host_msg_assembler *ma, u64 chunk_id)
|
|
{
|
|
if (chunk_id < ma->num_chunks_total) {
|
|
ma->chunk_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8));
|
|
}
|
|
}
|
|
|
|
/* ========================== *
|
|
* Packet
|
|
* ========================== */
|
|
|
|
INTERNAL struct host_snd_packet *host_channel_snd_packet_alloc(struct host_channel *channel, b32 is_reliable)
|
|
{
|
|
struct host *host = channel->host;
|
|
struct host_snd_packet *packet = 0;
|
|
if (host->first_free_packet) {
|
|
packet = host->first_free_packet;
|
|
host->first_free_packet = packet->next;
|
|
} else {
|
|
packet = arena_push_no_zero(host->arena, struct host_snd_packet);
|
|
}
|
|
MEMZERO_STRUCT(packet);
|
|
|
|
if (is_reliable) {
|
|
if (channel->last_reliable_packet) {
|
|
channel->last_reliable_packet->next = packet;
|
|
} else {
|
|
channel->first_reliable_packet = packet;
|
|
}
|
|
channel->last_reliable_packet = packet;
|
|
++channel->num_reliable_packets;
|
|
packet->seq = ++channel->last_sent_seq;
|
|
} else {
|
|
if (channel->last_unreliable_packet) {
|
|
channel->last_unreliable_packet->next = packet;
|
|
} else {
|
|
channel->first_unreliable_packet = packet;
|
|
}
|
|
channel->last_unreliable_packet = packet;
|
|
++channel->num_unreliable_packets;
|
|
}
|
|
return packet;
|
|
}
|
|
|
|
/* ========================== *
|
|
* Cmd interface
|
|
* ========================== */
|
|
|
|
INTERNAL struct host_cmd *host_cmd_alloc_and_append(struct host *host)
|
|
{
|
|
struct host_cmd *cmd = arena_push(host->cmd_arena, struct host_cmd);
|
|
if (host->last_cmd) {
|
|
host->last_cmd->next = cmd;
|
|
} else {
|
|
host->first_cmd = cmd;
|
|
}
|
|
host->last_cmd = cmd;
|
|
return cmd;
|
|
}
|
|
|
|
void host_queue_connect_to_address(struct host *host, struct sock_address connect_address)
|
|
{
|
|
struct host_channel *channel = host_channel_from_address(host, connect_address);
|
|
if (!channel->valid) {
|
|
channel = host_channel_alloc(host, connect_address);
|
|
}
|
|
}
|
|
|
|
void host_queue_disconnect(struct host *host, struct host_channel_id channel_id)
|
|
{
|
|
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
|
|
cmd->kind = HOST_CMD_KIND_DISCONNECT;
|
|
cmd->channel_id = channel_id;
|
|
}
|
|
|
|
void host_queue_write(struct host *host, struct host_channel_id channel_id, struct string msg, u32 flags)
|
|
{
|
|
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
|
|
cmd->kind = HOST_CMD_KIND_WRITE;
|
|
cmd->channel_id = channel_id;
|
|
cmd->write_msg = string_copy(host->cmd_arena, msg);
|
|
cmd->write_reliable = flags & HOST_WRITE_FLAG_RELIABLE;
|
|
}
|
|
|
|
/* ========================== *
|
|
* Info
|
|
* ========================== */
|
|
|
|
i64 host_get_channel_last_rtt_ns(struct host *host, struct host_channel_id channel_id)
|
|
{
|
|
struct host_channel *channel = host_single_channel_from_id(host, channel_id);
|
|
return channel->last_heartbeat_rtt_ns;
|
|
}
|
|
|
|
/* ========================== *
|
|
* Update
|
|
* ========================== */
|
|
|
|
INTERNAL struct host_event *push_event(struct arena *arena, struct host_event_list *list)
|
|
{
|
|
struct host_event *event = arena_push(arena, struct host_event);
|
|
if (list->last) {
|
|
list->last->next = event;
|
|
} else {
|
|
list->first = event;
|
|
}
|
|
list->last = event;
|
|
return event;
|
|
}
|
|
|
|
/* Read incoming packets, update channels, and return events */
|
|
struct host_event_list host_update_begin(struct arena *arena, struct host *host)
|
|
{
|
|
struct arena_temp scratch = scratch_begin(arena);
|
|
|
|
struct host_event_list events = ZI;
|
|
i64 now_ns = sys_time_ns();
|
|
|
|
{
|
|
__profn("Read host packets");
|
|
struct string read_buff = ZI;
|
|
read_buff.len = PACKET_DATA_MAX_LEN;
|
|
read_buff.text = arena_push_array_no_zero(scratch.arena, u8, read_buff.len);
|
|
|
|
/* Swap read & write rcv buffers */
|
|
{
|
|
struct snc_lock lock = snc_lock_e(&host->rcv_buffer_write_mutex);
|
|
struct host_rcv_buffer *swp = host->rcv_buffer_read;
|
|
host->rcv_buffer_read = host->rcv_buffer_write;
|
|
host->rcv_buffer_write = swp;
|
|
snc_unlock(&lock);
|
|
}
|
|
|
|
/* Read incoming packets */
|
|
struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_read;
|
|
for (struct host_rcv_packet *packet = rcv_buffer->first_packet; packet; packet = packet->next) {
|
|
//struct sock *sock = packet->sock;
|
|
struct sock_address address = packet->address;
|
|
struct bitbuff bb = bitbuff_from_string(packet->data);
|
|
struct bitbuff_reader br = br_from_bitbuff(&bb);
|
|
u32 magic = br_read_ubits(&br, 32); /* TODO: implicitly encode magic into crc32 */
|
|
if (magic == PACKET_MAGIC) {
|
|
/* TODO: Combine kind byte with flags byte */
|
|
struct host_channel *channel = host_channel_from_address(host, address);
|
|
enum host_packet_kind host_packet_kind = br_read_ibits(&br, 8);
|
|
u8 packet_flags = br_read_ubits(&br, 8);
|
|
|
|
u64 their_acked_seq = br_read_uv(&br);
|
|
if (channel->valid) {
|
|
channel->last_packet_received_ns = now_ns;
|
|
if (their_acked_seq > channel->their_acked_seq) {
|
|
channel->their_acked_seq = their_acked_seq;
|
|
}
|
|
}
|
|
|
|
b32 skip_packet = 0;
|
|
b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE;
|
|
if (channel->valid) {
|
|
if (is_reliable) {
|
|
u64 packet_seq = br_read_uv(&br);
|
|
if (packet_seq == channel->our_acked_seq + 1) {
|
|
channel->our_acked_seq = packet_seq;
|
|
} else {
|
|
skip_packet = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!skip_packet) {
|
|
switch (host_packet_kind) {
|
|
case HOST_PACKET_KIND_TRY_CONNECT:
|
|
{
|
|
/* A foreign host is trying to connect to us */
|
|
if (!channel->valid) {
|
|
logf_info("Host received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
|
/* TODO: Verify that some per-host uuid isn't present in a rolling window to prevent reconnects right after a disconnect? */
|
|
channel = host_channel_alloc(host, address);
|
|
}
|
|
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
|
|
cmd->kind = HOST_CMD_KIND_CONNECT_SUCCESS;
|
|
cmd->channel_id = channel->id;
|
|
} break;
|
|
|
|
case HOST_PACKET_KIND_CONNECT_SUCCESS:
|
|
{
|
|
/* We successfully connected to a foreign host and they are ready to receive messages */
|
|
if (channel->valid && !channel->connected) {
|
|
logf_info("Host received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
|
struct host_event *event = push_event(arena, &events);
|
|
event->kind = HOST_EVENT_KIND_CHANNEL_OPENED;
|
|
event->channel_id = channel->id;
|
|
channel->connected = 1;
|
|
}
|
|
} break;
|
|
|
|
case HOST_PACKET_KIND_DISCONNECT:
|
|
{
|
|
/* A foreign host disconnected from us */
|
|
if (channel->valid) {
|
|
logf_info("Host received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
|
struct host_event *event = push_event(arena, &events);
|
|
event->kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
|
|
event->channel_id = channel->id;
|
|
host_channel_release(channel);
|
|
}
|
|
|
|
} break;
|
|
|
|
case HOST_PACKET_KIND_HEARTBEAT:
|
|
{
|
|
if (channel->valid) {
|
|
u16 heartbeat_id = br_read_ubits(&br, 16);
|
|
u16 acked_heartbeat_id = br_read_ubits(&br, 16);
|
|
if (heartbeat_id > channel->last_heartbeat_received_id) {
|
|
channel->last_heartbeat_received_id = heartbeat_id;
|
|
}
|
|
if (acked_heartbeat_id == channel->last_heartbeat_acked_id + 1) {
|
|
channel->last_heartbeat_acked_id = acked_heartbeat_id;
|
|
if (channel->last_heartbeat_acked_ns > 0) {
|
|
channel->last_heartbeat_rtt_ns = now_ns - channel->last_heartbeat_acked_ns;
|
|
}
|
|
channel->last_heartbeat_acked_ns = now_ns;
|
|
}
|
|
}
|
|
} break;
|
|
|
|
case HOST_PACKET_KIND_MSG_CHUNK:
|
|
{
|
|
if (channel->valid && channel->connected) {
|
|
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
|
|
u64 msg_id = br_read_uv(&br);
|
|
u64 chunk_id = br_read_uv(&br);
|
|
u64 chunk_count = br_read_uv(&br);
|
|
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
|
|
u64 chunk_len = is_last_chunk ? br_read_uv(&br) : PACKET_MSG_CHUNK_MAX_LEN;
|
|
|
|
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
|
|
if (!ma) {
|
|
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
|
|
}
|
|
|
|
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
|
|
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
|
|
u8 *src = br_read_bytes_raw(&br, chunk_len);
|
|
if (src) {
|
|
u8 *dst = &ma->chunk_data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN];
|
|
MEMCPY(dst, src, chunk_len);
|
|
if (is_last_chunk) {
|
|
ma->last_chunk_len = chunk_len;
|
|
}
|
|
host_msg_assembler_set_chunk_received(ma, chunk_id);
|
|
++ma->num_chunks_received;
|
|
host_msg_assembler_touch(ma, now_ns);
|
|
if (ma->num_chunks_received == chunk_count) {
|
|
/* All chunks filled, message has finished assembling */
|
|
/* TODO: Message ordering */
|
|
struct host_event *event = push_event(arena, &events);
|
|
struct string data = ZI;
|
|
data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len;
|
|
data.text = arena_push_array_no_zero(arena, u8, data.len);
|
|
MEMCPY(data.text, ma->chunk_data, data.len);
|
|
event->kind = HOST_EVENT_KIND_MSG;
|
|
event->msg = data;
|
|
event->channel_id = channel->id;
|
|
if (is_reliable) {
|
|
/* Release assembler if reliable */
|
|
host_msg_assembler_release(ma);
|
|
}
|
|
}
|
|
} else {
|
|
/* Overflow reading chunk */
|
|
ASSERT(0);
|
|
}
|
|
}
|
|
} else {
|
|
/* Chunk id/count mismatch */
|
|
ASSERT(0);
|
|
}
|
|
}
|
|
} break;
|
|
|
|
default: break;
|
|
}
|
|
}
|
|
host->bytes_received += packet->data.len;
|
|
}
|
|
}
|
|
/* Reset read buffer */
|
|
rcv_buffer->first_packet = 0;
|
|
rcv_buffer->last_packet = 0;
|
|
arena_reset(rcv_buffer->arena);
|
|
}
|
|
|
|
/* Update channels */
|
|
{
|
|
__profn("Update host channels");
|
|
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
|
|
struct host_channel *channel = &host->channels[i];
|
|
if (channel->valid) {
|
|
/* Send / resend handshake if not connected */
|
|
if (!channel->connected) {
|
|
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
|
|
cmd->kind = HOST_CMD_KIND_TRY_CONNECT;
|
|
cmd->channel_id = channel->id;
|
|
}
|
|
/* Send heartbeat */
|
|
/* TODO: Send this less frequently (once per second or half of timeout or something) */
|
|
{
|
|
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
|
|
cmd->kind = HOST_CMD_KIND_HEARTBEAT;
|
|
cmd->heartbeat_id = channel->last_heartbeat_acked_id + 1;
|
|
cmd->heartbeat_ack_id = channel->last_heartbeat_received_id;
|
|
cmd->channel_id = channel->id;
|
|
}
|
|
/* Release acked reliable packets */
|
|
{
|
|
u64 acked_seq = channel->their_acked_seq;
|
|
struct host_snd_packet *packet = channel->first_reliable_packet;
|
|
while (packet) {
|
|
struct host_snd_packet *next = packet->next;
|
|
u64 seq = packet->seq;
|
|
if (seq < acked_seq) {
|
|
packet->next = host->first_free_packet;
|
|
host->first_free_packet = packet;
|
|
channel->first_reliable_packet = next;
|
|
--channel->num_reliable_packets;
|
|
} else {
|
|
break;
|
|
}
|
|
packet = next;
|
|
}
|
|
if (channel->first_reliable_packet == 0) {
|
|
channel->last_reliable_packet = 0;
|
|
}
|
|
}
|
|
/* Release timed out unreliable msg buffers */
|
|
{
|
|
/* TODO: Configurable timeout */
|
|
i64 unreliable_msg_timeout_ns = NS_FROM_SECONDS(0.1);
|
|
struct host_msg_assembler *ma = channel->least_recent_msg_assembler;
|
|
while (ma) {
|
|
struct host_msg_assembler *next = ma->more_recent;
|
|
if ((now_ns - ma->touched_ns) > unreliable_msg_timeout_ns) {
|
|
if (!ma->is_reliable) {
|
|
host_msg_assembler_release(ma);
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
ma = next;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
scratch_end(scratch);
|
|
return events;
|
|
}
|
|
|
|
/* Process host cmds & send outgoing packets */
|
|
void host_update_end(struct host *host)
|
|
{
|
|
__prof;
|
|
struct arena_temp scratch = scratch_begin_no_conflict();
|
|
|
|
/* Process cmds into sendable packets */
|
|
/* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */
|
|
{
|
|
__profn("Process host cmds");
|
|
for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) {
|
|
enum host_cmd_kind kind = cmd->kind;
|
|
struct host_channel_id channel_id = cmd->channel_id;
|
|
struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id);
|
|
for (struct host_channel_node *node = channels.first; node; node = node->next) {
|
|
struct host_channel *channel = node->channel;
|
|
switch (kind) {
|
|
case HOST_CMD_KIND_TRY_CONNECT:
|
|
{
|
|
u8 packet_flags = 0;
|
|
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, 0);
|
|
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
|
|
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
|
|
bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */
|
|
bw_write_ibits(&bw, HOST_PACKET_KIND_TRY_CONNECT, 8);
|
|
bw_write_ubits(&bw, packet_flags, 8);
|
|
bw_write_uv(&bw, channel->our_acked_seq);
|
|
packet->data_len = bw_num_bytes_written(&bw);
|
|
} break;
|
|
|
|
case HOST_CMD_KIND_CONNECT_SUCCESS:
|
|
{
|
|
u8 packet_flags = 0;
|
|
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, 0);
|
|
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
|
|
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
|
|
bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */
|
|
bw_write_ibits(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS, 8);
|
|
bw_write_ubits(&bw, packet_flags, 8);
|
|
bw_write_uv(&bw, channel->our_acked_seq);
|
|
packet->data_len = bw_num_bytes_written(&bw);
|
|
} break;
|
|
|
|
case HOST_CMD_KIND_DISCONNECT:
|
|
{
|
|
u8 packet_flags = 0;
|
|
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, 0);
|
|
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
|
|
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
|
|
bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */
|
|
bw_write_ibits(&bw, HOST_PACKET_KIND_DISCONNECT, 8);
|
|
bw_write_ubits(&bw, packet_flags, 8);
|
|
bw_write_uv(&bw, channel->our_acked_seq);
|
|
packet->data_len = bw_num_bytes_written(&bw);
|
|
} break;
|
|
|
|
case HOST_CMD_KIND_HEARTBEAT:
|
|
{
|
|
u8 packet_flags = 0;
|
|
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, 0);
|
|
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
|
|
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
|
|
bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */
|
|
bw_write_ibits(&bw, HOST_PACKET_KIND_HEARTBEAT, 8);
|
|
bw_write_ubits(&bw, packet_flags, 8);
|
|
bw_write_uv(&bw, channel->our_acked_seq);
|
|
bw_write_ubits(&bw, cmd->heartbeat_id, 16);
|
|
bw_write_ubits(&bw, cmd->heartbeat_ack_id, 16);
|
|
packet->data_len = bw_num_bytes_written(&bw);
|
|
} break;
|
|
|
|
case HOST_CMD_KIND_WRITE:
|
|
{
|
|
b32 is_reliable = cmd->write_reliable;
|
|
u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE);
|
|
struct string msg = cmd->write_msg;
|
|
|
|
u64 chunk_count = 0;
|
|
if (msg.len > 0) {
|
|
chunk_count = (msg.len - 1) / PACKET_MSG_CHUNK_MAX_LEN;
|
|
}
|
|
chunk_count += 1;
|
|
|
|
u64 msg_id = ++channel->last_sent_msg_id;
|
|
for (u64 i = 0; i < chunk_count; ++i) {
|
|
u64 chunk_len = PACKET_MSG_CHUNK_MAX_LEN;
|
|
b32 is_last_chunk = i + 1 == chunk_count;
|
|
if (is_last_chunk) {
|
|
chunk_len = msg.len % PACKET_MSG_CHUNK_MAX_LEN;
|
|
if (chunk_len == 0) {
|
|
chunk_len = PACKET_MSG_CHUNK_MAX_LEN;
|
|
}
|
|
}
|
|
struct host_snd_packet *packet = host_channel_snd_packet_alloc(channel, is_reliable);
|
|
struct bitbuff bb = bitbuff_from_string(STRING_FROM_ARRAY(packet->data));
|
|
struct bitbuff_writer bw = bw_from_bitbuff(&bb);
|
|
bw_write_ubits(&bw, PACKET_MAGIC, 32); /* TODO: implicitly encode magic into crc32 */
|
|
bw_write_ibits(&bw, HOST_PACKET_KIND_MSG_CHUNK, 8);
|
|
bw_write_ubits(&bw, packet_flags, 8);
|
|
bw_write_uv(&bw, channel->our_acked_seq);
|
|
if (is_reliable) {
|
|
bw_write_uv(&bw, packet->seq);
|
|
}
|
|
bw_write_uv(&bw, msg_id);
|
|
bw_write_uv(&bw, i);
|
|
bw_write_uv(&bw, chunk_count);
|
|
if (is_last_chunk) {
|
|
bw_write_uv(&bw, chunk_len);
|
|
}
|
|
u8 *chunk_data = msg.text + (i * PACKET_MSG_CHUNK_MAX_LEN);
|
|
bw_write_bytes(&bw, STRING(chunk_len, chunk_data));
|
|
packet->data_len = bw_num_bytes_written(&bw);
|
|
}
|
|
} break;
|
|
|
|
default: break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Send packets */
|
|
/* TODO: Aggregate small packets */
|
|
{
|
|
__profn("Send host packets");
|
|
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
|
|
struct sock *sock = host->sock;
|
|
struct host_channel *channel = &host->channels[i];
|
|
u64 total_sent = 0;
|
|
if (channel->valid) {
|
|
struct sock_address address = channel->address;
|
|
/* Send reliable packets to channel */
|
|
for (struct host_snd_packet *packet = channel->first_reliable_packet; packet; packet = packet->next) {
|
|
sock_write(sock, address, STRING(packet->data_len, packet->data));
|
|
total_sent += packet->data_len;
|
|
}
|
|
/* Send unreliable packets to channel */
|
|
for (struct host_snd_packet *packet = channel->first_unreliable_packet; packet; packet = packet->next) {
|
|
sock_write(sock, address, STRING(packet->data_len, packet->data));
|
|
total_sent += packet->data_len;
|
|
}
|
|
/* Release unreliable packets */
|
|
if (channel->first_unreliable_packet) {
|
|
channel->last_unreliable_packet->next = host->first_free_packet;
|
|
host->first_free_packet = channel->first_unreliable_packet;
|
|
channel->first_unreliable_packet = 0;
|
|
channel->last_unreliable_packet = 0;
|
|
channel->num_unreliable_packets = 0;
|
|
}
|
|
host->bytes_sent += total_sent;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/* Reset cmds */
|
|
host->first_cmd = 0;
|
|
host->last_cmd = 0;
|
|
arena_reset(host->cmd_arena);
|
|
|
|
scratch_end(scratch);
|
|
}
|
|
|
|
/* ========================== *
|
|
* Receive thread
|
|
* ========================== */
|
|
|
|
INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg)
|
|
{
|
|
u64 read_buff_size = KIBI(64);
|
|
struct arena *read_buff_arena = arena_alloc(read_buff_size);
|
|
struct string read_buff = ZI;
|
|
read_buff.len = read_buff_size;
|
|
read_buff.text = arena_push_array_no_zero(read_buff_arena, u8, read_buff_size);
|
|
|
|
struct host *host = (struct host *)arg;
|
|
|
|
struct sock_array socks = ZI;
|
|
socks.socks = &host->sock;
|
|
socks.count = 1;
|
|
|
|
struct atomic32 *shutdown = &host->receiver_thread_shutdown_flag.v;
|
|
while (!atomic32_fetch(shutdown)) {
|
|
struct sock *sock = sock_wait_for_available_read(socks, F32_INFINITY);
|
|
struct sock_read_result res;
|
|
while (!atomic32_fetch(shutdown) && sock && (res = sock_read(sock, read_buff)).valid) {
|
|
struct sock_address address = res.address;
|
|
struct string data = res.data;
|
|
if (data.len > 0) {
|
|
struct snc_lock lock = snc_lock_e(&host->rcv_buffer_write_mutex);
|
|
{
|
|
struct host_rcv_buffer *rcv_buffer = host->rcv_buffer_write;
|
|
struct host_rcv_packet *packet = arena_push(rcv_buffer->arena, struct host_rcv_packet);
|
|
packet->address = address;
|
|
packet->data = string_copy(rcv_buffer->arena, data);
|
|
if (rcv_buffer->last_packet) {
|
|
rcv_buffer->last_packet->next = packet;
|
|
} else {
|
|
rcv_buffer->first_packet = packet;
|
|
}
|
|
rcv_buffer->last_packet = packet;
|
|
}
|
|
snc_unlock(&lock);
|
|
}
|
|
}
|
|
}
|
|
|
|
arena_release(read_buff_arena);
|
|
}
|