host function profiling
This commit is contained in:
parent
72728e6a98
commit
6f7c19e053
596
src/host.c
596
src/host.c
@ -11,14 +11,12 @@
|
||||
//#define HOST_NETWORK_ADDRESS_NONE
|
||||
|
||||
#define PACKET_MAGIC 0xd9e3b8b6
|
||||
#define PACKET_CHUNK_MAX_LEN 256
|
||||
#define PACKET_MSG_CHUNK_MAX_LEN 1024
|
||||
#define PACKET_DATA_MAX_LEN (1280 * 2) /* Give enough space for msg chunk + header */
|
||||
|
||||
#define NUM_CHANNEL_LOOKUP_BUCKETS 512
|
||||
#define NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS 16384
|
||||
|
||||
/* Give enough space for msg chunk + header */
|
||||
#define PACKET_DATA_MAX_LEN (PACKET_CHUNK_MAX_LEN * 2)
|
||||
|
||||
enum host_packet_kind {
|
||||
HOST_PACKET_KIND_NONE,
|
||||
HOST_PACKET_KIND_TRY_CONNECT,
|
||||
@ -399,7 +397,7 @@ INTERNAL struct host_msg_assembler *host_msg_assembler_alloc(struct host_channel
|
||||
ma->num_chunks_total = chunk_count;
|
||||
|
||||
/* FIXME: Use buddy allocator or something */
|
||||
u64 data_size = chunk_count * PACKET_CHUNK_MAX_LEN;
|
||||
u64 data_size = chunk_count * PACKET_MSG_CHUNK_MAX_LEN;
|
||||
u64 bitmap_size = ((chunk_count - 1) / 8) + 1;
|
||||
ma->testarena = arena_alloc(data_size + bitmap_size);
|
||||
/* FIXME: Ensure chunk_count > 0 */
|
||||
@ -627,138 +625,293 @@ void host_update(struct host *host)
|
||||
|
||||
i64 now_ns = sys_time_ns();
|
||||
|
||||
struct string read_buff = ZI;
|
||||
read_buff.len = PACKET_DATA_MAX_LEN;
|
||||
read_buff.text = arena_push_array(scratch.arena, u8, read_buff.len);
|
||||
|
||||
/* Swap read & write recv buffers */
|
||||
{
|
||||
struct sys_lock lock = sys_mutex_lock_e(&host->recv_buffer_write_mutex);
|
||||
struct host_recv_buffer *swp = host->recv_buffer_read;
|
||||
host->recv_buffer_read = host->recv_buffer_write;
|
||||
host->recv_buffer_write = swp;
|
||||
sys_mutex_unlock(&lock);
|
||||
}
|
||||
/* Read incoming packets */
|
||||
struct host_recv_buffer *recv_buffer = host->recv_buffer_read;
|
||||
for (struct host_recv_packet *packet = recv_buffer->first_packet; packet; packet = packet->next) {
|
||||
//struct sock *sock = packet->sock;
|
||||
struct sock_address address = packet->address;
|
||||
struct byte_reader br = br_from_buffer(packet->data);
|
||||
u32 magic = br_read_u32(&br);
|
||||
if (magic == PACKET_MAGIC) {
|
||||
/* TODO: Combine kind byte with flags byte */
|
||||
struct host_channel *channel = host_channel_from_address(host, address);
|
||||
enum host_packet_kind host_packet_kind = br_read_i8(&br);
|
||||
u8 packet_flags = br_read_u8(&br);
|
||||
__profscope(host_update_read_packets);
|
||||
struct string read_buff = ZI;
|
||||
read_buff.len = PACKET_DATA_MAX_LEN;
|
||||
read_buff.text = arena_push_array(scratch.arena, u8, read_buff.len);
|
||||
|
||||
u64 their_acked_seq = br_read_var_uint(&br);
|
||||
if (their_acked_seq > channel->their_acked_seq) {
|
||||
channel->their_acked_seq = their_acked_seq;
|
||||
}
|
||||
/* Swap read & write recv buffers */
|
||||
{
|
||||
struct sys_lock lock = sys_mutex_lock_e(&host->recv_buffer_write_mutex);
|
||||
struct host_recv_buffer *swp = host->recv_buffer_read;
|
||||
host->recv_buffer_read = host->recv_buffer_write;
|
||||
host->recv_buffer_write = swp;
|
||||
sys_mutex_unlock(&lock);
|
||||
}
|
||||
/* Read incoming packets */
|
||||
struct host_recv_buffer *recv_buffer = host->recv_buffer_read;
|
||||
for (struct host_recv_packet *packet = recv_buffer->first_packet; packet; packet = packet->next) {
|
||||
//struct sock *sock = packet->sock;
|
||||
struct sock_address address = packet->address;
|
||||
struct byte_reader br = br_from_buffer(packet->data);
|
||||
u32 magic = br_read_u32(&br);
|
||||
if (magic == PACKET_MAGIC) {
|
||||
/* TODO: Combine kind byte with flags byte */
|
||||
struct host_channel *channel = host_channel_from_address(host, address);
|
||||
enum host_packet_kind host_packet_kind = br_read_i8(&br);
|
||||
u8 packet_flags = br_read_u8(&br);
|
||||
|
||||
b32 skip_packet = false;
|
||||
b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE;
|
||||
if (channel->valid) {
|
||||
if (is_reliable) {
|
||||
u64 packet_seq = br_read_var_uint(&br);
|
||||
if (packet_seq == channel->our_acked_seq + 1) {
|
||||
channel->our_acked_seq = packet_seq;
|
||||
} else {
|
||||
skip_packet = true;
|
||||
u64 their_acked_seq = br_read_var_uint(&br);
|
||||
if (their_acked_seq > channel->their_acked_seq) {
|
||||
channel->their_acked_seq = their_acked_seq;
|
||||
}
|
||||
|
||||
b32 skip_packet = false;
|
||||
b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE;
|
||||
if (channel->valid) {
|
||||
if (is_reliable) {
|
||||
u64 packet_seq = br_read_var_uint(&br);
|
||||
if (packet_seq == channel->our_acked_seq + 1) {
|
||||
channel->our_acked_seq = packet_seq;
|
||||
} else {
|
||||
skip_packet = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!skip_packet) {
|
||||
switch (host_packet_kind) {
|
||||
case HOST_PACKET_KIND_TRY_CONNECT:
|
||||
{
|
||||
/* A foreign host is trying to connect to us */
|
||||
if (!channel->valid) {
|
||||
logf_info("Received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
||||
/* TODO: Verify that some per-host uuid isn't
|
||||
* present in a rolling window to prevent reconnects right after a disconnect? */
|
||||
channel = host_channel_alloc(host, address);
|
||||
}
|
||||
} break;
|
||||
|
||||
case HOST_PACKET_KIND_CONNECT_SUCCESS:
|
||||
{
|
||||
/* We successfully connected to a foreign host and they are ready to receive messages */
|
||||
if (channel->valid && !channel->connected) {
|
||||
logf_info("Received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
||||
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
|
||||
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_OPENED;
|
||||
queued_event->event.channel_id = channel->id;
|
||||
channel->connected = true;
|
||||
}
|
||||
} break;
|
||||
|
||||
case HOST_PACKET_KIND_DISCONNECT:
|
||||
{
|
||||
/* A foreign host disconnected from us */
|
||||
if (channel->valid) {
|
||||
logf_info("Received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
||||
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
|
||||
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
|
||||
queued_event->event.channel_id = channel->id;
|
||||
host_channel_release(channel);
|
||||
}
|
||||
|
||||
} break;
|
||||
|
||||
case HOST_PACKET_KIND_MSG_CHUNK:
|
||||
{
|
||||
if (channel->valid) {
|
||||
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
|
||||
u64 msg_id = br_read_var_uint(&br);
|
||||
u64 chunk_id = br_read_var_uint(&br);
|
||||
u64 chunk_count = br_read_var_uint(&br);
|
||||
|
||||
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
|
||||
u64 data_len = is_last_chunk ? (br_read_var_uint(&br) + 1) : PACKET_MSG_CHUNK_MAX_LEN;
|
||||
|
||||
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
|
||||
if (!ma) {
|
||||
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
|
||||
}
|
||||
|
||||
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
|
||||
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
|
||||
u8 *src = br_seek(&br, data_len);
|
||||
if (src) {
|
||||
u8 *dst = &ma->data[chunk_id * PACKET_MSG_CHUNK_MAX_LEN];
|
||||
MEMCPY(dst, src, data_len);
|
||||
if (is_last_chunk) {
|
||||
ma->last_chunk_len = data_len;
|
||||
}
|
||||
host_msg_assembler_set_chunk_received(ma, chunk_id);
|
||||
++ma->num_chunks_received;
|
||||
host_msg_assembler_touch(ma, now_ns);
|
||||
if (ma->num_chunks_received == chunk_count) {
|
||||
/* All chunks filled, message has finished assembling */
|
||||
/* TODO: Message ordering */
|
||||
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
|
||||
struct string data = ZI;
|
||||
data.len = ((chunk_count - 1) * PACKET_MSG_CHUNK_MAX_LEN) + ma->last_chunk_len;
|
||||
data.text = arena_push_array(&host->queued_event_arena, u8, data.len);
|
||||
MEMCPY(data.text, ma->data, data.len);
|
||||
queued_event->event.kind = HOST_EVENT_KIND_MSG;
|
||||
queued_event->event.msg = data;
|
||||
queued_event->event.channel_id = channel->id;
|
||||
if (is_reliable) {
|
||||
/* Release assembler if reliable */
|
||||
host_msg_assembler_release(ma);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
} break;
|
||||
|
||||
default: break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Reset read buffer */
|
||||
recv_buffer->first_packet = NULL;
|
||||
recv_buffer->last_packet = NULL;
|
||||
arena_reset(&recv_buffer->arena);
|
||||
}
|
||||
|
||||
if (!skip_packet) {
|
||||
switch (host_packet_kind) {
|
||||
case HOST_PACKET_KIND_TRY_CONNECT:
|
||||
{
|
||||
/* A foreign host is trying to connect to us */
|
||||
if (!channel->valid) {
|
||||
logf_info("Received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
||||
/* TODO: Verify that some per-host uuid isn't
|
||||
* present in a rolling window to prevent reconnects right after a disconnect? */
|
||||
channel = host_channel_alloc(host, address);
|
||||
/* Update channels */
|
||||
{
|
||||
__profscope(host_update_channels);
|
||||
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
|
||||
struct host_channel *channel = &host->channels[i];
|
||||
if (channel->valid) {
|
||||
/* Send / resend handshake if not connected */
|
||||
if (!channel->connected) {
|
||||
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
|
||||
cmd->kind = HOST_CMD_KIND_TRY_CONNECT;
|
||||
cmd->channel_id = channel->id;
|
||||
}
|
||||
/* Release acked reliable packets */
|
||||
{
|
||||
u64 acked_seq = channel->their_acked_seq;
|
||||
struct host_packet *host_packet = channel->first_reliable_packet;
|
||||
while (host_packet) {
|
||||
struct host_packet *next = host_packet->next;
|
||||
u64 seq = host_packet->seq;
|
||||
if (seq < acked_seq) {
|
||||
host_packet->next = host->first_free_packet;
|
||||
host->first_free_packet = host_packet;
|
||||
channel->first_reliable_packet = next;
|
||||
--channel->num_reliable_packets;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} break;
|
||||
|
||||
case HOST_PACKET_KIND_CONNECT_SUCCESS:
|
||||
{
|
||||
/* We successfully connected to a foreign host and they are ready to receive messages */
|
||||
if (channel->valid && !channel->connected) {
|
||||
logf_info("Received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
||||
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
|
||||
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_OPENED;
|
||||
queued_event->event.channel_id = channel->id;
|
||||
channel->connected = true;
|
||||
}
|
||||
} break;
|
||||
|
||||
case HOST_PACKET_KIND_DISCONNECT:
|
||||
{
|
||||
/* A foreign host disconnected from us */
|
||||
if (channel->valid) {
|
||||
logf_info("Received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
|
||||
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
|
||||
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
|
||||
queued_event->event.channel_id = channel->id;
|
||||
host_channel_release(channel);
|
||||
}
|
||||
|
||||
} break;
|
||||
|
||||
case HOST_PACKET_KIND_MSG_CHUNK:
|
||||
{
|
||||
if (channel->valid) {
|
||||
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
|
||||
u64 msg_id = br_read_var_uint(&br);
|
||||
u64 chunk_id = br_read_var_uint(&br);
|
||||
u64 chunk_count = br_read_var_uint(&br);
|
||||
|
||||
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
|
||||
u64 data_len = is_last_chunk ? (br_read_u8(&br) + 1) : PACKET_CHUNK_MAX_LEN;
|
||||
|
||||
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
|
||||
if (!ma) {
|
||||
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
|
||||
host_packet = next;
|
||||
}
|
||||
if (channel->first_reliable_packet == NULL) {
|
||||
channel->last_reliable_packet = NULL;
|
||||
}
|
||||
}
|
||||
/* TODO: Release timed out unreliable msg buffers */
|
||||
{
|
||||
/* TODO: Configurable timeout */
|
||||
i64 timeout_ns = NS_FROM_SECONDS(4);
|
||||
struct host_msg_assembler *ma = channel->least_recent_msg_assembler;
|
||||
while (ma) {
|
||||
struct host_msg_assembler *next = ma->more_recent;
|
||||
if ((now_ns - ma->touched_ns) > timeout_ns) {
|
||||
if (!ma->is_reliable) {
|
||||
host_msg_assembler_release(ma);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
ma = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
|
||||
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
|
||||
u8 *src = br_seek(&br, data_len);
|
||||
if (src) {
|
||||
u8 *dst = &ma->data[chunk_id * PACKET_CHUNK_MAX_LEN];
|
||||
MEMCPY(dst, src, data_len);
|
||||
if (is_last_chunk) {
|
||||
ma->last_chunk_len = data_len;
|
||||
}
|
||||
host_msg_assembler_set_chunk_received(ma, chunk_id);
|
||||
++ma->num_chunks_received;
|
||||
host_msg_assembler_touch(ma, now_ns);
|
||||
if (ma->num_chunks_received == chunk_count) {
|
||||
/* All chunks filled, message has finished assembling */
|
||||
/* TODO: Message ordering */
|
||||
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
|
||||
struct string data = ZI;
|
||||
data.len = ((chunk_count - 1) * PACKET_CHUNK_MAX_LEN) + ma->last_chunk_len;
|
||||
data.text = arena_push_array(&host->queued_event_arena, u8, data.len);
|
||||
MEMCPY(data.text, ma->data, data.len);
|
||||
queued_event->event.kind = HOST_EVENT_KIND_MSG;
|
||||
queued_event->event.msg = data;
|
||||
queued_event->event.channel_id = channel->id;
|
||||
if (is_reliable) {
|
||||
/* Release assembler if reliable */
|
||||
host_msg_assembler_release(ma);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(false);
|
||||
/* Process cmds */
|
||||
/* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */
|
||||
{
|
||||
__profscope(host_update_process_cmds);
|
||||
for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) {
|
||||
enum host_cmd_kind kind = cmd->kind;
|
||||
struct host_channel_id channel_id = cmd->channel_id;
|
||||
struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id);
|
||||
for (struct host_channel_node *node = channels.first; node; node = node->next) {
|
||||
struct host_channel *channel = node->channel;
|
||||
switch (kind) {
|
||||
case HOST_CMD_KIND_TRY_CONNECT:
|
||||
{
|
||||
u8 packet_flags = 0;
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_TRY_CONNECT);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
} break;
|
||||
|
||||
case HOST_CMD_KIND_CONNECT_SUCCESS:
|
||||
{
|
||||
u8 packet_flags = 0;
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
} break;
|
||||
|
||||
case HOST_CMD_KIND_DISCONNECT:
|
||||
{
|
||||
u8 packet_flags = 0;
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_DISCONNECT);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
} break;
|
||||
|
||||
case HOST_CMD_KIND_WRITE:
|
||||
{
|
||||
b32 is_reliable = cmd->write_reliable;
|
||||
u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE);
|
||||
struct string msg = cmd->write_msg;
|
||||
|
||||
u64 chunk_count = 0;
|
||||
if (msg.len > 0) {
|
||||
chunk_count = (msg.len - 1) / PACKET_MSG_CHUNK_MAX_LEN;
|
||||
}
|
||||
chunk_count += 1;
|
||||
|
||||
u64 msg_id = ++channel->last_sent_msg_id;
|
||||
for (u64 i = 0; i < chunk_count; ++i) {
|
||||
u64 data_len = PACKET_MSG_CHUNK_MAX_LEN;
|
||||
b32 is_last_chunk = i + 1 == chunk_count;
|
||||
if (is_last_chunk) {
|
||||
data_len = msg.len % PACKET_MSG_CHUNK_MAX_LEN;
|
||||
}
|
||||
u8 *data = msg.text + (i * PACKET_MSG_CHUNK_MAX_LEN);
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, is_reliable);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_MSG_CHUNK);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
if (is_reliable) {
|
||||
bw_write_var_uint(&bw, host_packet->seq);
|
||||
}
|
||||
bw_write_var_uint(&bw, msg_id);
|
||||
bw_write_var_uint(&bw, i);
|
||||
bw_write_var_uint(&bw, chunk_count);
|
||||
if (is_last_chunk) {
|
||||
/* FIXME: Ensure data_len can never be 0 */
|
||||
bw_write_var_uint(&bw, data_len - 1);
|
||||
}
|
||||
bw_write_buffer(&bw, STRING(data_len, data));
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
}
|
||||
} break;
|
||||
|
||||
@ -767,177 +920,32 @@ void host_update(struct host *host)
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Reset read buffer */
|
||||
recv_buffer->first_packet = NULL;
|
||||
recv_buffer->last_packet = NULL;
|
||||
arena_reset(&recv_buffer->arena);
|
||||
|
||||
/* Update channels */
|
||||
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
|
||||
struct host_channel *channel = &host->channels[i];
|
||||
if (channel->valid) {
|
||||
/* Send / resend handshake if not connected */
|
||||
if (!channel->connected) {
|
||||
struct host_cmd *cmd = host_cmd_alloc_and_append(host);
|
||||
cmd->kind = HOST_CMD_KIND_TRY_CONNECT;
|
||||
cmd->channel_id = channel->id;
|
||||
}
|
||||
/* Release acked reliable packets */
|
||||
{
|
||||
u64 acked_seq = channel->their_acked_seq;
|
||||
struct host_packet *host_packet = channel->first_reliable_packet;
|
||||
while (host_packet) {
|
||||
struct host_packet *next = host_packet->next;
|
||||
u64 seq = host_packet->seq;
|
||||
if (seq < acked_seq) {
|
||||
host_packet->next = host->first_free_packet;
|
||||
host->first_free_packet = host_packet;
|
||||
channel->first_reliable_packet = next;
|
||||
--channel->num_reliable_packets;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
host_packet = next;
|
||||
}
|
||||
if (channel->first_reliable_packet == NULL) {
|
||||
channel->last_reliable_packet = NULL;
|
||||
}
|
||||
}
|
||||
/* TODO: Release timed out unreliable msg buffers */
|
||||
{
|
||||
/* TODO: Configurable timeout */
|
||||
i64 timeout_ns = NS_FROM_SECONDS(4);
|
||||
struct host_msg_assembler *ma = channel->least_recent_msg_assembler;
|
||||
while (ma) {
|
||||
struct host_msg_assembler *next = ma->more_recent;
|
||||
if ((now_ns - ma->touched_ns) > timeout_ns) {
|
||||
if (!ma->is_reliable) {
|
||||
host_msg_assembler_release(ma);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
ma = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Process cmds */
|
||||
/* TODO: Unreliable packets don't need to be allocated into unreliable packet queue, should just send them and forget */
|
||||
for (struct host_cmd *cmd = host->first_cmd; cmd; cmd = cmd->next) {
|
||||
enum host_cmd_kind kind = cmd->kind;
|
||||
struct host_channel_id channel_id = cmd->channel_id;
|
||||
struct host_channel_list channels = host_channels_from_id(scratch.arena, host, channel_id);
|
||||
for (struct host_channel_node *node = channels.first; node; node = node->next) {
|
||||
struct host_channel *channel = node->channel;
|
||||
switch (kind) {
|
||||
case HOST_CMD_KIND_TRY_CONNECT:
|
||||
{
|
||||
logf_info("TRY CONNECT TO %F family: %F, valid: %F", FMT_STR(sock_string_from_address(scratch.arena, channel->address)), FMT_SINT(channel->address.family), FMT_SINT(channel->address.valid));
|
||||
|
||||
u8 packet_flags = 0;
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_TRY_CONNECT);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
} break;
|
||||
|
||||
case HOST_CMD_KIND_CONNECT_SUCCESS:
|
||||
{
|
||||
u8 packet_flags = 0;
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_CONNECT_SUCCESS);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
} break;
|
||||
|
||||
case HOST_CMD_KIND_DISCONNECT:
|
||||
{
|
||||
u8 packet_flags = 0;
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, false);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_DISCONNECT);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
} break;
|
||||
|
||||
case HOST_CMD_KIND_WRITE:
|
||||
{
|
||||
b32 is_reliable = cmd->write_reliable;
|
||||
u8 packet_flags = (is_reliable * HOST_PACKET_FLAG_RELIABLE);
|
||||
struct string msg = cmd->write_msg;
|
||||
|
||||
u64 chunk_count = 0;
|
||||
if (msg.len > 0) {
|
||||
chunk_count = (msg.len - 1) / PACKET_CHUNK_MAX_LEN;
|
||||
}
|
||||
chunk_count += 1;
|
||||
|
||||
u64 msg_id = ++channel->last_sent_msg_id;
|
||||
for (u64 i = 0; i < chunk_count; ++i) {
|
||||
u64 data_len = PACKET_CHUNK_MAX_LEN;
|
||||
b32 is_last_chunk = i + 1 == chunk_count;
|
||||
if (is_last_chunk) {
|
||||
data_len = msg.len % PACKET_CHUNK_MAX_LEN;
|
||||
}
|
||||
u8 *data = msg.text + (i * PACKET_CHUNK_MAX_LEN);
|
||||
struct host_packet *host_packet = host_channel_packet_alloc(channel, is_reliable);
|
||||
struct byte_writer bw = bw_from_buffer(STRING_FROM_ARRAY(host_packet->data));
|
||||
bw_write_u32(&bw, PACKET_MAGIC);
|
||||
bw_write_i8(&bw, HOST_PACKET_KIND_MSG_CHUNK);
|
||||
bw_write_u8(&bw, packet_flags);
|
||||
bw_write_var_uint(&bw, channel->our_acked_seq);
|
||||
if (is_reliable) {
|
||||
bw_write_var_uint(&bw, host_packet->seq);
|
||||
}
|
||||
bw_write_var_uint(&bw, msg_id);
|
||||
bw_write_var_uint(&bw, i);
|
||||
bw_write_var_uint(&bw, chunk_count);
|
||||
if (is_last_chunk) {
|
||||
/* FIXME: Ensure data_len can never be 0 */
|
||||
bw_write_u8(&bw, data_len - 1);
|
||||
}
|
||||
bw_write_buffer(&bw, STRING(data_len, data));
|
||||
host_packet->data_len = bw_pos(&bw);
|
||||
}
|
||||
} break;
|
||||
|
||||
default: break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Process packets */
|
||||
/* TODO: Aggregate small packets */
|
||||
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
|
||||
struct sock *sock = host->sock;
|
||||
struct host_channel *channel = &host->channels[i];
|
||||
if (channel->valid) {
|
||||
struct sock_address address = channel->address;
|
||||
/* Send reliable packets to channel */
|
||||
for (struct host_packet *host_packet = channel->first_reliable_packet; host_packet; host_packet = host_packet->next) {
|
||||
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
|
||||
}
|
||||
/* Send unreliable packets to channel */
|
||||
for (struct host_packet *host_packet = channel->first_unreliable_packet; host_packet; host_packet = host_packet->next) {
|
||||
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
|
||||
}
|
||||
/* Release unreliable packets */
|
||||
if (channel->first_unreliable_packet) {
|
||||
channel->last_unreliable_packet->next = host->first_free_packet;
|
||||
host->first_free_packet = channel->first_unreliable_packet;
|
||||
channel->first_unreliable_packet = NULL;
|
||||
channel->last_unreliable_packet = NULL;
|
||||
channel->num_unreliable_packets = 0;
|
||||
{
|
||||
__profscope(host_update_send_packets);
|
||||
for (u64 i = 0; i < host->num_channels_reserved; ++i) {
|
||||
struct sock *sock = host->sock;
|
||||
struct host_channel *channel = &host->channels[i];
|
||||
if (channel->valid) {
|
||||
struct sock_address address = channel->address;
|
||||
/* Send reliable packets to channel */
|
||||
for (struct host_packet *host_packet = channel->first_reliable_packet; host_packet; host_packet = host_packet->next) {
|
||||
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
|
||||
}
|
||||
/* Send unreliable packets to channel */
|
||||
for (struct host_packet *host_packet = channel->first_unreliable_packet; host_packet; host_packet = host_packet->next) {
|
||||
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
|
||||
}
|
||||
/* Release unreliable packets */
|
||||
if (channel->first_unreliable_packet) {
|
||||
channel->last_unreliable_packet->next = host->first_free_packet;
|
||||
host->first_free_packet = channel->first_unreliable_packet;
|
||||
channel->first_unreliable_packet = NULL;
|
||||
channel->last_unreliable_packet = NULL;
|
||||
channel->num_unreliable_packets = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -65,7 +65,6 @@ INTERNAL struct sock_address sock_address_from_ip_port_cstr(char *ip_cstr, char
|
||||
MEMCPY(res.ipnb, (void *)&sockaddr->sin_addr, 4);
|
||||
break;
|
||||
} else if (ai_res->ai_family == AF_INET6) {
|
||||
/* FIXME: Why must this be disabled to work? */
|
||||
#if 0
|
||||
struct sockaddr_in6 *sockaddr = (struct sockaddr_in6 *)ai_res->ai_addr;
|
||||
res.valid = true;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user