host progress

This commit is contained in:
jacob 2025-02-06 16:13:28 -06:00
parent 627f736345
commit 84e64b5cce
6 changed files with 418 additions and 164 deletions

View File

@ -26,7 +26,7 @@ GLOBAL struct {
struct sprite_scope *sprite_frame_scope;
struct host host;
struct host *host;
/* For debugging */
struct v2 user_cursor;
@ -531,7 +531,7 @@ INTERNAL void game_update(void)
struct game_cmd_list game_cmds = ZI;
{
struct host_event_array host_events = host_pop_events(scratch.arena, &G.host);
struct host_event_array host_events = host_pop_events(scratch.arena, G.host);
game_cmds_from_host_events(scratch.arena, host_events, &game_cmds);
}
@ -573,7 +573,7 @@ INTERNAL void game_update(void)
if (client_ent->valid) {
entity_disable_prop(client_ent, ENTITY_PROP_PLAYER_CONTROLLED);
entity_enable_prop(client_ent, ENTITY_PROP_RELEASE_NEXT_TICK);
host_queue_disconnect(&G.host, channel_id);
host_queue_disconnect(G.host, channel_id);
}
client_release(client);
}
@ -1327,12 +1327,12 @@ INTERNAL void game_update(void)
l.last = &snapshot_event;
struct string msg = game_string_from_events(temp.arena, l);
host_queue_write(&G.host, HOST_CHANNEL_ID_ALL, msg);
host_queue_write(G.host, HOST_CHANNEL_ID_ALL, msg);
arena_temp_end(temp);
}
host_update(&G.host);
host_update(G.host);
__profframe("Game");
/* ========================== *

View File

@ -1,8 +1,9 @@
#include "host.h"
#include "arena.h"
#include "host.h"
#include "scratch.h"
#include "byteio.h"
#include "sys.h"
#include "util.h"
//#define HOST_NETWORK_ADDRESS_STRING(str)
//#define HOST_NETWORK_ADDRESS_ALL_LOCAL_INTERFACES(port)
@ -11,43 +12,56 @@
#define PACKET_MAGIC 0xd9e3b8b6
#define PACKET_CHUNK_MAX_LEN 256
#define NUM_CHANNEL_LOOKUP_BUCKETS 512
#define NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS 16384
/* Give enough space for msg chunk + header */
#define PACKET_DATA_MAX_LEN (PACKET_CHUNK_MAX_LEN * 2)
enum packet_kind {
PACKET_KIND_NONE,
PACKET_KIND_TRY_CONNECT,
PACKET_KIND_CONNECT_SUCCESS,
PACKET_KIND_DISCONNECT,
PACKET_KIND_MSG_CHUNK
enum host_packet_kind {
HOST_PACKET_KIND_NONE,
HOST_PACKET_KIND_TRY_CONNECT,
HOST_PACKET_KIND_CONNECT_SUCCESS,
HOST_PACKET_KIND_DISCONNECT,
HOST_PACKET_KIND_MSG_CHUNK
};
enum packet_flag {
PACKET_FLAG_NONE = 0,
PACKET_FLAG_RELIABLE = (1 << 0)
enum host_packet_flag {
HOST_PACKET_FLAG_NONE = 0,
HOST_PACKET_FLAG_RELIABLE = (1 << 0)
};
struct packet {
struct host_packet {
u64 seq;
u8 flags;
u64 data_len;
u8 data[PACKET_DATA_MAX_LEN];
struct packet *next;
struct host_packet *next;
};
struct host_channel {
struct host_channel_id id;
b32 valid;
b32 connected;
struct host *host;
struct host_channel_id id;
struct sock_address address;
struct host_channel *next_free;
struct packet *first_reliable_packet;
struct packet *last_reliable_packet;
struct packet *first_unreliable_packet;
struct packet *last_unreliable_packet;
struct sock_address address;
u64 address_hash;
struct host_channel *next_address_hash;
struct host_channel *prev_address_hash;
/* NOTE: Packets are allocated in host's `arena` */
struct host_packet *first_reliable_packet;
struct host_packet *last_reliable_packet;
struct host_packet *first_unreliable_packet;
struct host_packet *last_unreliable_packet;
/* NOTE: Msg assemblers are allocated in host's `arena` */
struct host_msg_assembler *least_recent_msg_assembler;
struct host_msg_assembler *most_recent_msg_assembler;
u64 their_acked_seq;
u64 our_acked_seq;
@ -63,13 +77,41 @@ struct host_channel_list {
struct host_channel_node *last;
};
struct host_channel_lookup_bucket {
struct host_channel *first;
struct host_channel *last;
};
struct host_queued_event {
struct host_event event;
struct host_queued_event *next;
};
struct recv_buffer {
struct host_msg_assembler {
struct host *host;
struct host_channel *channel;
/* TODO: Remove this (testing) */
struct arena testarena;
/* Free list */
struct host_msg_assembler *next_free;
/* Bucket list */
struct host_msg_assembler *next_hash;
struct host_msg_assembler *prev_hash;
/* Channel list */
struct host_msg_assembler *less_recent;
struct host_msg_assembler *more_recent;
u64 msg_id;
u64 hash;
u64 last_chunk_len;
u64 num_chunks_total;
@ -81,12 +123,19 @@ struct recv_buffer {
u8 *data;
};
struct host_msg_assembler_lookup_bucket {
struct host_msg_assembler *first;
struct host_msg_assembler *last;
};
READONLY GLOBAL struct host_channel _g_host_channel_nil = { .valid = false };
GLOBAL struct {
i32 _;
} G = ZI, DEBUG_ALIAS(G, G_host);
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma);
/* ========================== *
* Startup
* ========================== */
@ -102,136 +151,315 @@ struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr)
* Host
* ========================== */
struct host host_alloc(struct sock_address bind_address)
struct host *host_alloc(struct sock_address bind_address)
{
struct host host = ZI;
host.cmd_arena = arena_alloc(GIGABYTE(64));
host.queued_event_arena = arena_alloc(GIGABYTE(64));
host.packet_arena = arena_alloc(GIGABYTE(64));
host.channels_arena = arena_alloc(GIGABYTE(64));
host.sock = sock_alloc(bind_address, SOCK_FLAG_NON_BLOCKING);
struct arena arena = arena_alloc(GIGABYTE(64));
struct host *host = arena_push_zero(&arena, struct host);
host->arena = arena;
host->cmd_arena = arena_alloc(GIGABYTE(64));
host->queued_event_arena = arena_alloc(GIGABYTE(64));
host->channel_arena = arena_alloc(GIGABYTE(64));
host->channels = arena_dry_push(&host->channel_arena, struct host_channel);
host->channel_lookup_buckets = arena_push_array_zero(&host->arena, struct host_channel_lookup_bucket, NUM_CHANNEL_LOOKUP_BUCKETS);
host->msg_assembler_lookup_buckets = arena_push_array_zero(&host->arena, struct host_msg_assembler_lookup_bucket, NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS);
host->sock = sock_alloc(bind_address, SOCK_FLAG_NON_BLOCKING);
return host;
}
void host_release(struct host *host)
{
sock_release(&host->sock);
arena_release(&host->channels_arena);
arena_release(&host->packet_arena);
arena_release(&host->channel_arena);
arena_release(&host->queued_event_arena);
arena_release(&host->cmd_arena);
arena_release(&host->arena);
}
/* ========================== *
* Channel
* ========================== */
INTERNAL u64 hash_from_address(struct sock_address address)
{
return hash_fnv64(HASH_FNV64_BASIS, STRING_FROM_STRUCT(&address));
}
INTERNAL struct host_channel *host_channel_from_address(struct host *host, struct sock_address address)
{
(UNUSED)host;
(UNUSED)address;
u64 hash = hash_from_address(address);
struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[hash % host->num_channel_lookup_buckets];
for (struct host_channel *channel = bucket->first; channel; channel = channel->next_address_hash) {
if (channel->address_hash == hash && sock_address_eq(channel->address, address)) {
return channel;
}
}
return &_g_host_channel_nil;
}
struct host_channel *res = &_g_host_channel_nil;
return res;
/* Returns nil channel if id = HOST_CHANNEL_ID_ALL */
INTERNAL struct host_channel *host_single_channel_from_id(struct host *host, struct host_channel_id channel_id)
{
if (channel_id.gen > 0 && channel_id.idx < host->num_channels_reserved) {
struct host_channel *channel = &host->channels[channel_id.idx];
if (channel->id.gen == channel_id.gen) {
return channel;
}
}
return &_g_host_channel_nil;
}
INTERNAL struct host_channel_list host_channels_from_id(struct arena *arena, struct host *host, struct host_channel_id channel_id)
{
(UNUSED)arena;
(UNUSED)host;
(UNUSED)channel_id;
struct host_channel_list res = ZI;
if (host_channel_id_eq(channel_id, HOST_CHANNEL_ID_ALL)) {
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
struct host_channel *channel = &host->channels[i];
if (channel->valid && channel->connected) {
struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node);
n->channel = channel;
if (res.last) {
res.last->next = n;
} else {
res.first = n;
}
res.last = n;
}
}
} else {
struct host_channel *channel = host_single_channel_from_id(host, channel_id);
if (channel->valid) {
struct host_channel_node *n = arena_push_zero(arena, struct host_channel_node);
n->channel = channel;
res.first = n;
res.last = n;
}
}
return res;
}
INTERNAL struct host_channel *host_channel_alloc(struct host *host, struct sock_address address)
{
(UNUSED)host;
(UNUSED)address;
struct host_channel_id id = ZI;
struct host_channel *channel;
if (host->first_free_channel) {
channel = host->first_free_channel;
host->first_free_channel = channel->next_free;
id = channel->id;
++id.gen;
} else {
channel = arena_push(&host->channel_arena, struct host_channel);
id.gen = 1;
id.idx = host->num_channels_reserved;
++host->num_channels_reserved;
}
MEMZERO_STRUCT(channel);
channel->id = id;
channel->host = host;
channel->address = address;
u64 address_hash = hash_from_address(address);
channel->address_hash = address_hash;
struct host_channel *res = NULL;
return res;
u64 bucket_index = address_hash % host->num_channel_lookup_buckets;
struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[bucket_index];
if (bucket->last) {
channel->prev_address_hash = bucket->last;
bucket->last->next_address_hash = channel;
} else {
bucket->first = channel;
}
bucket->last = channel;
return channel;
}
INTERNAL void host_channel_release(struct host_channel *channel)
{
(UNUSED)channel;
struct host *host = channel->host;
/* Release from lookup table */
{
struct host_channel_lookup_bucket *bucket = &host->channel_lookup_buckets[channel->address_hash % host->num_channel_lookup_buckets];
struct host_channel *prev = channel->prev_address_hash;
struct host_channel *next = channel->next_address_hash;
if (prev) {
prev->next_address_hash = next;
} else {
bucket->first = next;
}
if (next) {
next->prev_address_hash = prev;
} else {
bucket->last = prev;
}
}
/* Release packets */
{
if (channel->first_unreliable_packet) {
host->first_free_packet = channel->first_unreliable_packet;
channel->last_unreliable_packet->next = host->first_free_packet;
}
if (channel->first_reliable_packet) {
host->first_free_packet = channel->first_reliable_packet;
channel->last_reliable_packet->next = host->first_free_packet;
}
}
/* Release msg assemblers */
for (struct host_msg_assembler *ma = channel->least_recent_msg_assembler; ma; ma = ma->more_recent) {
host_msg_assembler_release(ma);
}
++channel->id.gen;
channel->valid = false;
channel->next_free = host->first_free_channel;
host->first_free_channel = channel;
}
INTERNAL struct recv_buffer *host_channel_get_recv_buffer(struct host_channel *channel, u64 msg_id)
{
(UNUSED)channel;
(UNUSED)msg_id;
/* ========================== *
* Msg assembler
* ========================== */
struct recv_buffer *res = NULL;
INTERNAL u64 hash_from_channel_msg(struct host_channel_id channel_id, u64 msg_id)
{
u64 res = HASH_FNV64_BASIS;
res = hash_fnv64(res, STRING_FROM_STRUCT(&channel_id));
res = hash_fnv64(res, STRING_FROM_STRUCT(&msg_id));
return res;
}
INTERNAL struct host_msg_assembler *host_get_msg_assembler(struct host *host, struct host_channel_id channel_id, u64 msg_id)
{
u64 hash = hash_from_channel_msg(channel_id, msg_id);
struct host_msg_assembler_lookup_bucket *bucket = &host->msg_assembler_lookup_buckets[hash % host->num_msg_assembler_lookup_buckets];
for (struct host_msg_assembler *ma = bucket->first; ma; ma = ma->next_hash) {
if (ma->hash == hash && host_channel_id_eq(ma->channel->id, channel_id) && ma->msg_id == msg_id) {
return ma;
}
}
return NULL;
}
INTERNAL struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count, u64 now_ns)
{
struct host *host = channel->host;
struct host_msg_assembler *ma;
if (host->first_free_msg_assembler) {
ma = host->first_free_msg_assembler;
host->first_free_msg_assembler = ma->next_free;
} else {
ma = arena_push(&host->arena, struct host_msg_assembler);
}
MEMZERO_STRUCT(ma);
ma->channel = channel;
ma->msg_id = msg_id;
ma->num_chunks_total = chunk_count;
/* FIXME: Use buddy allocator or something */
u64 buff_size = chunk_count * PACKET_CHUNK_MAX_LEN;
ma->testarena = arena_alloc(buff_size);
/* FIXME: Ensure chunk_count > 0 */
ma->chunks_received_bitmap = arena_push_array_zero(&ma->testarena, u8, (((chunk_count - 1) / 8) + 1));
ma->data = arena_push_array(&ma->testarena, u8, buff_size);
/* Insert into channel list */
ma->touched_ns = now_ns;
if (channel->most_recent_msg_assembler) {
channel->most_recent_msg_assembler->more_recent = ma;
ma->less_recent = channel->most_recent_msg_assembler;
} else {
channel->least_recent_msg_assembler = ma;
}
channel->most_recent_msg_assembler = ma;
return ma;
}
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma)
{
struct host *host = ma->host;
struct host_channel *channel = ma->channel;
/* FIXME: Data should be in a buddy allocator or something */
arena_release(&ma->testarena);
/* Release from channel list */
{
struct host_msg_assembler *prev = ma->less_recent;
struct host_msg_assembler *next = ma->more_recent;
if (prev) {
prev->more_recent = next;
} else {
channel->least_recent_msg_assembler = next;
}
if (next) {
next->less_recent = prev;
} else {
channel->most_recent_msg_assembler = prev;
}
}
ma->next_free = host->first_free_msg_assembler;
host->first_free_msg_assembler = ma;
}
INTERNAL b32 host_msg_assembler_is_chunk_filled(struct host_msg_assembler *ma, u64 chunk_id)
{
if (chunk_id < ma->num_chunks_total) {
return (ma->chunks_received_bitmap[chunk_id / 8] & (1 << (chunk_id % 8))) != 0;
}
return false;
}
INTERNAL void host_msg_assembler_set_chunk_received(struct host_msg_assembler *ma, u64 chunk_id)
{
if (chunk_id < ma->num_chunks_total) {
ma->chunks_received_bitmap[chunk_id / 8] |= (1 << (chunk_id % 8));
}
}
/* ========================== *
* Packet
* ========================== */
INTERNAL struct packet *host_channel_packet_alloc(struct host_channel *channel, b32 is_reliable)
INTERNAL struct host_packet *host_channel_packet_alloc(struct host_channel *channel, b32 is_reliable)
{
struct host *host = channel->host;
struct packet *packet = NULL;
struct host_packet *host_packet = NULL;
if (host->first_free_packet) {
packet = host->first_free_packet;
host->first_free_packet = packet->next;
host_packet = host->first_free_packet;
host->first_free_packet = host_packet->next;
} else {
packet = arena_push(&host->packet_arena, struct packet);
host_packet = arena_push(&host->channel_arena, struct host_packet);
}
MEMZERO_STRUCT(packet);
MEMZERO_STRUCT(host_packet);
if (is_reliable) {
if (channel->last_reliable_packet) {
channel->last_reliable_packet->next = packet;
channel->last_reliable_packet->next = host_packet;
} else {
channel->first_reliable_packet = packet;
channel->first_reliable_packet = host_packet;
}
channel->last_reliable_packet = packet;
channel->last_reliable_packet = host_packet;
} else {
if (channel->last_unreliable_packet) {
channel->last_unreliable_packet->next = packet;
channel->last_unreliable_packet->next = host_packet;
} else {
channel->first_unreliable_packet = packet;
channel->first_unreliable_packet = host_packet;
}
channel->last_unreliable_packet = packet;
channel->last_unreliable_packet = host_packet;
}
return packet;
return host_packet;
}
/* ========================== *
* Recv buffer
* ========================== */
INTERNAL struct recv_buffer *recv_buffer_alloc(struct host_channel *channel, u64 msg_id, u64 chunk_count)
{
(UNUSED)channel;
(UNUSED)msg_id;
(UNUSED)chunk_count;
struct recv_buffer *res = NULL;
return res;
}
INTERNAL b32 recv_buffer_is_chunk_filled(struct recv_buffer *rb, u64 chunk_id)
{
(UNUSED)rb;
(UNUSED)chunk_id;
return false;
}
INTERNAL void recv_buffer_set_chunk_received(struct recv_buffer *rb, u64 chunk_id)
{
(UNUSED)rb;
(UNUSED)chunk_id;
}
/* ========================== *
* Queue
* Cmd interface
* ========================== */
INTERNAL struct host_cmd *host_cmd_alloc_and_append(struct host *host)
@ -312,7 +540,7 @@ void host_update(struct host *host)
if (magic == PACKET_MAGIC) {
/* TODO: Combine kind byte with flags byte */
struct host_channel *channel = host_channel_from_address(host, address);
enum packet_kind packet_kind = br_read_i8(&br);
enum host_packet_kind host_packet_kind = br_read_i8(&br);
u8 packet_flags = br_read_u8(&br);
u64 their_acked_seq = br_read_var_uint(&br);
@ -321,7 +549,7 @@ void host_update(struct host *host)
}
b32 should_process_packet = false;
if (packet_flags & PACKET_FLAG_RELIABLE) {
if (packet_flags & HOST_PACKET_FLAG_RELIABLE) {
u64 packet_seq = br_read_var_uint(&br);
if (packet_seq == channel->our_acked_seq + 1) {
channel->our_acked_seq = packet_seq;
@ -332,8 +560,8 @@ void host_update(struct host *host)
}
if (should_process_packet) {
switch (packet_kind) {
case PACKET_KIND_TRY_CONNECT:
switch (host_packet_kind) {
case HOST_PACKET_KIND_TRY_CONNECT:
{
/* A foreign host is trying to connect to us */
if (!channel->valid) {
@ -343,7 +571,7 @@ void host_update(struct host *host)
}
} break;
case PACKET_KIND_CONNECT_SUCCESS:
case HOST_PACKET_KIND_CONNECT_SUCCESS:
{
/* We successfully connected to a foreign host and they are ready to receive messages */
if (channel->valid && !channel->connected) {
@ -354,7 +582,7 @@ void host_update(struct host *host)
}
} break;
case PACKET_KIND_DISCONNECT:
case HOST_PACKET_KIND_DISCONNECT:
{
/* A foreign host disconnected from us */
if (channel->valid) {
@ -366,7 +594,7 @@ void host_update(struct host *host)
} break;
case PACKET_KIND_MSG_CHUNK:
case HOST_PACKET_KIND_MSG_CHUNK:
{
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
u64 msg_id = br_read_var_uint(&br);
@ -376,31 +604,31 @@ void host_update(struct host *host)
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
u64 data_len = is_last_chunk ? (br_read_u8(&br) + 1) : PACKET_CHUNK_MAX_LEN;
struct recv_buffer *recv_buff = host_channel_get_recv_buffer(channel, msg_id);
if (!recv_buff) {
recv_buff = recv_buffer_alloc(channel, msg_id, chunk_count);
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
if (!ma) {
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns);
}
if (chunk_count == recv_buff->num_chunks_total && chunk_id < chunk_count) {
if (!recv_buffer_is_chunk_filled(recv_buff, chunk_id)) {
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
u8 *src = br_seek(&br, data_len);
if (src) {
u8 *dst = &recv_buff->data[chunk_id * PACKET_CHUNK_MAX_LEN];
u8 *dst = &ma->data[chunk_id * PACKET_CHUNK_MAX_LEN];
MEMCPY(dst, src, data_len);
if (is_last_chunk) {
recv_buff->last_chunk_len = data_len;
ma->last_chunk_len = data_len;
}
recv_buffer_set_chunk_received(recv_buff, chunk_id);
++recv_buff->num_chunks_received;
recv_buff->touched_ns = now_ns;
if (recv_buff->num_chunks_received == chunk_count) {
host_msg_assembler_set_chunk_received(ma, chunk_id);
++ma->num_chunks_received;
ma->touched_ns = now_ns;
if (ma->num_chunks_received == chunk_count) {
/* All chunks filled, message has finished assembling */
/* TODO: Message ordering */
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
struct string data = ZI;
data.len = ((chunk_count - 1) * PACKET_CHUNK_MAX_LEN) + recv_buff->last_chunk_len;
data.len = ((chunk_count - 1) * PACKET_CHUNK_MAX_LEN) + ma->last_chunk_len;
data.text = arena_push_array(&host->queued_event_arena, u8, data.len);
MEMCPY(data.text, recv_buff->data, data.len);
MEMCPY(data.text, ma->data, data.len);
queued_event->event.kind = HOST_EVENT_KIND_MSG;
queued_event->event.msg = data;
queued_event->event.channel_id = channel->id;
@ -420,10 +648,8 @@ void host_update(struct host *host)
}
}
/* Release expired msg buffers */
/* Update channels */
for (u64 i = 0; i < host->channels_reserved; ++i) {
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
struct host_channel *channel = &host->channels[i];
if (channel->valid) {
/* Send / resend handshake if not connected */
@ -435,73 +661,73 @@ void host_update(struct host *host)
/* Release acked reliable packets */
{
u64 acked_seq = channel->their_acked_seq;
struct packet *packet = channel->first_reliable_packet;
while (packet) {
struct packet *next = packet->next;
u64 seq = packet->seq;
struct host_packet *host_packet = channel->first_reliable_packet;
while (host_packet) {
struct host_packet *next = host_packet->next;
u64 seq = host_packet->seq;
if (seq < acked_seq) {
packet->next = host->first_free_packet;
host->first_free_packet = packet;
host_packet->next = host->first_free_packet;
host->first_free_packet = host_packet;
channel->first_reliable_packet = next;
} else {
break;
}
packet = next;
host_packet = next;
}
if (channel->first_reliable_packet == NULL) {
channel->last_reliable_packet = NULL;
}
}
/* TODO: Release timed out unreliable msg buffers */
}
}
/* Process cmds */
/* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */
for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) {
struct temp_arena temp = arena_temp_begin(scratch.arena);
enum host_cmd_kind kind = cmd->kind;
struct host_channel_id channel_id = cmd->channel_id;
struct host_channel_list channels = host_channels_from_id(temp.arena, host, channel_id);
struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id);
for (struct host_channel_node *node = channels.first; node; node = node->next) {
struct host_channel *channel = node->channel;
switch (kind) {
case HOST_CMD_KIND_TRY_CONNECT:
{
u8 packet_flags = 0;
struct packet *packet = host_channel_packet_alloc(channel, false);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data));
bw_write_i8(&bw, PACKET_KIND_TRY_CONNECT);
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
bw_write_i8(&bw, HOST_PACKET_KIND_TRY_CONNECT);
bw_write_u8(&bw, packet_flags);
bw_write_var_uint(&bw, channel->our_acked_seq);
packet->data_len = bw_pos(&bw);
host_packet->data_len = bw_pos(&bw);
} break;
case HOST_CMD_KIND_CONNECT_SUCCESS:
{
u8 packet_flags = 0;
struct packet *packet = host_channel_packet_alloc(channel, false);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data));
bw_write_i8(&bw, PACKET_KIND_CONNECT_SUCCESS);
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
bw_write_i8(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS);
bw_write_u8(&bw, packet_flags);
bw_write_var_uint(&bw, channel->our_acked_seq);
packet->data_len = bw_pos(&bw);
host_packet->data_len = bw_pos(&bw);
} break;
case HOST_CMD_KIND_DISCONNECT:
{
u8 packet_flags = 0;
struct packet *packet = host_channel_packet_alloc(channel, false);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data));
bw_write_i8(&bw, PACKET_KIND_DISCONNECT);
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
bw_write_i8(&bw, HOST_PACKET_KIND_DISCONNECT);
bw_write_u8(&bw, packet_flags);
bw_write_var_uint(&bw, channel->our_acked_seq);
packet->data_len = bw_pos(&bw);
host_packet->data_len = bw_pos(&bw);
} break;
case HOST_CMD_KIND_WRITE:
{
b32 is_reliable = cmd->write_reliable;
u8 packet_flags = (is_reliable * PACKET_FLAG_RELIABLE);
u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE);
struct string msg = cmd->write_msg;
u64 chunk_count = 0;
@ -517,41 +743,40 @@ void host_update(struct host *host)
data_len = msg.len % PACKET_CHUNK_MAX_LEN;
}
u8 *data = msg.text + (i * PACKET_CHUNK_MAX_LEN);
struct packet *packet = host_channel_packet_alloc(channel, is_reliable);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(packet->data));
bw_write_i8(&bw, PACKET_KIND_MSG_CHUNK);
struct host_packet *host_packet = host_channel_packet_alloc(channel, is_reliable);
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
bw_write_i8(&bw, HOST_PACKET_KIND_MSG_CHUNK);
bw_write_u8(&bw, packet_flags);
bw_write_var_uint(&bw, channel->our_acked_seq);
if (is_reliable) {
bw_write_var_uint(&bw, packet->seq);
bw_write_var_uint(&bw, host_packet->seq);
}
if (is_last_chunk) {
/* FIXME: Ensure data_len can never be 0 */
bw_write_u8(&bw, data_len - 1);
}
bw_write_buffer(&bw, STRING(data_len, data));
packet->data_len = bw_pos(&bw);
host_packet->data_len = bw_pos(&bw);
}
} break;
default: break;
}
}
arena_temp_end(temp);
}
/* Process packets */
/* TODO: Aggregate small packets */
for (u64 i = 0; i < host->channels_reserved; ++i) {
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
struct host_channel *channel = &host->channels[i];
struct sock_address address = channel->address;
/* Send unreliable packets to channel */
for (struct packet *packet = channel->first_unreliable_packet; packet; packet = packet->next) {
sock_write(sock, address, STRING(packet->data_len, packet->data));
for (struct host_packet *host_packet = channel->first_unreliable_packet; host_packet; host_packet = host_packet->next) {
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
}
/* Send un-acked reliable packets to channel */
for (struct packet *packet = channel->first_reliable_packet; packet; packet = packet->next) {
sock_write(sock, address, STRING(packet->data_len, packet->data));
for (struct host_packet *host_packet = channel->first_reliable_packet; host_packet; host_packet = host_packet->next) {
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
}
/* Release unreliable packets */
if (channel->first_unreliable_packet) {

View File

@ -6,7 +6,8 @@
#define HOST_CHANNEL_ID_NIL (struct host_channel_id) { .gen = 0, .idx = 0 }
#define HOST_CHANNEL_ID_ALL (struct host_channel_id) { .gen = U32_MAX, .idx = U32_MAX }
struct packet;
struct host_packet;
struct host_channel_lookup_bucket;
enum host_cmd_kind {
HOST_CMD_KIND_NONE,
@ -47,6 +48,7 @@ struct host_event_array {
};
struct host {
struct arena arena;
struct sock sock;
struct arena cmd_arena;
@ -59,12 +61,19 @@ struct host {
struct host_queued_event *last_queued_event;
u64 num_queued_events;
struct arena packet_arena;
struct packet *first_free_packet;
struct arena channels_arena;
struct arena channel_arena;
struct host_channel *channels;
u64 channels_reserved;
struct host_channel *first_free_channel;
u64 num_channels_reserved;
struct host_packet *first_free_packet; /* Allocated in `arena` */
struct host_msg_assembler *first_free_msg_assembler; /* Allocated in `arena` */
struct host_channel_lookup_bucket *channel_lookup_buckets; /* Allocated in `arena` */
u64 num_channel_lookup_buckets;
struct host_msg_assembler_lookup_bucket *msg_assembler_lookup_buckets; /* Allocated in `arena` */
u64 num_msg_assembler_lookup_buckets;
};
/* ========================== *
@ -78,7 +87,7 @@ struct host_startup_receipt host_startup(struct sock_startup_receipt *sock_sr);
* Host
* ========================== */
struct host host_alloc(struct sock_address bind_address);
struct host *host_alloc(struct sock_address bind_address);
void host_release(struct host *host);
@ -114,6 +123,11 @@ struct host_event_array host_pop_events(struct arena *arena, struct host *host);
INLINE b32 host_channel_id_eq(struct host_channel_id a, struct host_channel_id b)
{
return a.idx == b.idx && a.gen == b.gen;
}
INLINE b32 host_channel_id_is_nil(struct host_channel_id id)
{
return id.gen == 0 && id.idx == 0;

View File

@ -39,11 +39,11 @@ INTERNAL void gen_random_file(struct string path, u32 count)
struct rng_startup_receipt rng_startup(struct resource_startup_receipt *resource_sr)
{
(UNUSED)resource_sr;
G.arena = arena_alloc(GIGABYTE(64));
struct string noise_path = LIT("res/noise.dat");
if (resource_exists(noise_path)) {
struct resource r = resource_open(noise_path);
G.noise_count = r.data.len / sizeof(*G.noise);
G.arena = arena_alloc(sizeof(u64) * G.noise_count);
G.noise = arena_push_array(&G.arena, u64, G.noise_count);
MEMCPY(G.noise, r.data.text, r.data.len);
resource_close(r);

View File

@ -1,8 +1,7 @@
#ifndef SOCK_H
#define SOCK_H
#include "memory.h"
#if 1
@ -53,6 +52,22 @@ void sock_write(struct sock *sock, struct sock_address address, struct string da
INLINE b32 sock_address_eq(struct sock_address a, struct sock_address b)
{
return MEMEQ_STRUCT(&a, &b);
}
#else
#define SOCK_IP_ANY_INTERFACE CPPCOMPAT_INITLIST_TYPE(struct sock_ip) { 0 }

View File

@ -43,7 +43,7 @@ GLOBAL struct {
struct sys_window *window;
struct host host;
struct host *host;
/* Render targets */
struct renderer_texture final_texture;
@ -603,7 +603,7 @@ INTERNAL void user_update(void)
struct game_event_list game_events = ZI;
{
struct host_event_array host_events = host_pop_events(scratch.arena, &G.host);
struct host_event_array host_events = host_pop_events(scratch.arena, G.host);
game_events_from_host_events(scratch.arena, host_events, &game_events);
}
@ -619,7 +619,7 @@ INTERNAL void user_update(void)
f64 now = SECONDS_FROM_NS(sys_time_ns());
if (last_try_connect == 0 || (now - last_try_connect) > 5) {
struct sock_address connect_addr = sock_address_from_string(LIT("127.0.0.1:12345"));
host_queue_connect_to_address(&G.host, connect_addr);
host_queue_connect_to_address(G.host, connect_addr);
last_try_connect = now;
}
@ -1671,12 +1671,12 @@ INTERNAL void user_update(void)
struct temp_arena temp = arena_temp_begin(scratch.arena);
struct string cmds_str = game_string_from_cmds(temp.arena, cmd_list);
host_queue_write(&G.host, HOST_CHANNEL_ID_ALL, cmds_str);
host_queue_write(G.host, HOST_CHANNEL_ID_ALL, cmds_str);
arena_temp_end(temp);
}
host_update(&G.host);
host_update(G.host);
/* ========================== *
* Render