host progress

This commit is contained in:
jacob 2025-02-07 15:18:36 -06:00
parent d13a7c70e7
commit 72728e6a98
6 changed files with 330 additions and 246 deletions

View File

@ -36,9 +36,9 @@ struct host_packet {
u64 seq; u64 seq;
u8 flags; u8 flags;
u64 data_len; u64 data_len;
u8 data[PACKET_DATA_MAX_LEN];
struct host_packet *next; struct host_packet *next;
u8 data[PACKET_DATA_MAX_LEN];
}; };
struct host_channel { struct host_channel {
@ -92,18 +92,27 @@ struct host_queued_event {
struct host_queued_event *next; struct host_queued_event *next;
}; };
struct host_recv_packet {
struct sock *sock;
struct sock_address address;
struct string data;
struct host_recv_packet *next;
};
struct host_recv_buffer {
struct arena arena;
struct host_recv_packet *first_packet;
struct host_recv_packet *last_packet;
};
struct host_msg_assembler { struct host_msg_assembler {
struct host *host; struct host *host;
struct host_channel *channel; struct host_channel *channel;
b32 is_reliable; b32 is_reliable;
/* TODO: Remove this (testing) */ /* TODO: Remove this (testing) */
struct arena testarena; struct arena testarena;
/* Free list */ /* Free list */
struct host_msg_assembler *next_free; struct host_msg_assembler *next_free;
@ -139,6 +148,7 @@ GLOBAL struct {
i32 _; i32 _;
} G = ZI, DEBUG_ALIAS(G, G_host); } G = ZI, DEBUG_ALIAS(G, G_host);
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg);
INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma); INTERNAL void host_msg_assembler_release(struct host_msg_assembler *ma);
/* ========================== * /* ========================== *
@ -174,14 +184,31 @@ struct host *host_alloc(u16 listen_port)
host->num_msg_assembler_lookup_buckets = NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS; host->num_msg_assembler_lookup_buckets = NUM_MSG_ASSEMBLER_LOOKUP_BUCKETS;
host->msg_assembler_lookup_buckets = arena_push_array_zero(&host->arena, struct host_msg_assembler_lookup_bucket, host->num_msg_assembler_lookup_buckets); host->msg_assembler_lookup_buckets = arena_push_array_zero(&host->arena, struct host_msg_assembler_lookup_bucket, host->num_msg_assembler_lookup_buckets);
host->sock = sock_alloc(listen_port, SOCK_FLAG_NON_BLOCKING);
host->recv_buffer_read = arena_push_zero(&host->arena, struct host_recv_buffer);
host->recv_buffer_write = arena_push_zero(&host->arena, struct host_recv_buffer);
host->recv_buffer_read->arena = arena_alloc(GIGABYTE(64));
host->recv_buffer_write->arena = arena_alloc(GIGABYTE(64));
host->sock = sock_alloc(listen_port);
host->recv_buffer_write_mutex = sys_mutex_alloc();
host->receiver_thread = sys_thread_alloc(&host_receiver_thread_entry_point, host, LIT("[P6] Host receiver"));
return host; return host;
} }
void host_release(struct host *host) void host_release(struct host *host)
{ {
sock_release(&host->sock); /* FIXME: Signal thread shutdown */
sys_thread_wait_release(&host->receiver_thread);
sys_mutex_release(&host->recv_buffer_write_mutex);
sock_release(host->sock);
arena_release(&host->recv_buffer_write->arena);
arena_release(&host->recv_buffer_read->arena);
arena_release(&host->channel_arena); arena_release(&host->channel_arena);
arena_release(&host->queued_event_arena); arena_release(&host->queued_event_arena);
arena_release(&host->cmd_arena); arena_release(&host->cmd_arena);
@ -600,143 +627,150 @@ void host_update(struct host *host)
i64 now_ns = sys_time_ns(); i64 now_ns = sys_time_ns();
struct sock *sock = &host->sock;
struct string read_buff = ZI; struct string read_buff = ZI;
read_buff.len = PACKET_DATA_MAX_LEN; read_buff.len = PACKET_DATA_MAX_LEN;
read_buff.text = arena_push_array(scratch.arena, u8, read_buff.len); read_buff.text = arena_push_array(scratch.arena, u8, read_buff.len);
/* Read socket */ /* Swap read & write recv buffers */
while (true) { {
struct sock_read_result res = sock_read(sock, read_buff); struct sys_lock lock = sys_mutex_lock_e(&host->recv_buffer_write_mutex);
if (res.valid) { struct host_recv_buffer *swp = host->recv_buffer_read;
struct string sock_data = res.data; host->recv_buffer_read = host->recv_buffer_write;
struct sock_address address = res.address; host->recv_buffer_write = swp;
struct byte_reader br = br_from_buffer(sock_data); sys_mutex_unlock(&lock);
u32 magic = br_read_u32(&br); }
if (magic == PACKET_MAGIC) { /* Read incoming packets */
/* TODO: Combine kind byte with flags byte */ struct host_recv_buffer *recv_buffer = host->recv_buffer_read;
struct host_channel *channel = host_channel_from_address(host, address); for (struct host_recv_packet *packet = recv_buffer->first_packet; packet; packet = packet->next) {
enum host_packet_kind host_packet_kind = br_read_i8(&br); //struct sock *sock = packet->sock;
u8 packet_flags = br_read_u8(&br); struct sock_address address = packet->address;
struct byte_reader br = br_from_buffer(packet->data);
u32 magic = br_read_u32(&br);
if (magic == PACKET_MAGIC) {
/* TODO: Combine kind byte with flags byte */
struct host_channel *channel = host_channel_from_address(host, address);
enum host_packet_kind host_packet_kind = br_read_i8(&br);
u8 packet_flags = br_read_u8(&br);
u64 their_acked_seq = br_read_var_uint(&br); u64 their_acked_seq = br_read_var_uint(&br);
if (their_acked_seq > channel->their_acked_seq) { if (their_acked_seq > channel->their_acked_seq) {
channel->their_acked_seq = their_acked_seq; channel->their_acked_seq = their_acked_seq;
} }
b32 should_process_packet = false; b32 skip_packet = false;
b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE; b32 is_reliable = packet_flags & HOST_PACKET_FLAG_RELIABLE;
if (channel->valid) {
if (is_reliable) { if (is_reliable) {
u64 packet_seq = br_read_var_uint(&br); u64 packet_seq = br_read_var_uint(&br);
if (packet_seq == channel->our_acked_seq + 1) { if (packet_seq == channel->our_acked_seq + 1) {
channel->our_acked_seq = packet_seq; channel->our_acked_seq = packet_seq;
should_process_packet = true; } else {
} skip_packet = true;
} else {
should_process_packet = true;
}
if (should_process_packet) {
switch (host_packet_kind) {
case HOST_PACKET_KIND_TRY_CONNECT:
{
/* A foreign host is trying to connect to us */
if (!channel->valid) {
logf_info("Received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
/* TODO: Verify that some per-host uuid isn't
* present in a rolling window to prevent reconnects right after a disconnect? */
channel = host_channel_alloc(host, address);
}
} break;
case HOST_PACKET_KIND_CONNECT_SUCCESS:
{
/* We successfully connected to a foreign host and they are ready to receive messages */
if (channel->valid && !channel->connected) {
logf_info("Received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_OPENED;
queued_event->event.channel_id = channel->id;
channel->connected = true;
}
} break;
case HOST_PACKET_KIND_DISCONNECT:
{
/* A foreign host disconnected from us */
if (channel->valid) {
logf_info("Received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
queued_event->event.channel_id = channel->id;
host_channel_release(channel);
}
} break;
case HOST_PACKET_KIND_MSG_CHUNK:
{
if (channel->valid) {
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
u64 msg_id = br_read_var_uint(&br);
u64 chunk_id = br_read_var_uint(&br);
u64 chunk_count = br_read_var_uint(&br);
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
u64 data_len = is_last_chunk ? (br_read_u8(&br) + 1) : PACKET_CHUNK_MAX_LEN;
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
if (!ma) {
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
}
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
u8 *src = br_seek(&br, data_len);
if (src) {
u8 *dst = &ma->data[chunk_id * PACKET_CHUNK_MAX_LEN];
MEMCPY(dst, src, data_len);
if (is_last_chunk) {
ma->last_chunk_len = data_len;
}
host_msg_assembler_set_chunk_received(ma, chunk_id);
++ma->num_chunks_received;
host_msg_assembler_touch(ma, now_ns);
if (ma->num_chunks_received == chunk_count) {
/* All chunks filled, message has finished assembling */
/* TODO: Message ordering */
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
struct string data = ZI;
data.len = ((chunk_count - 1) * PACKET_CHUNK_MAX_LEN) + ma->last_chunk_len;
data.text = arena_push_array(&host->queued_event_arena, u8, data.len);
MEMCPY(data.text, ma->data, data.len);
queued_event->event.kind = HOST_EVENT_KIND_MSG;
queued_event->event.msg = data;
queued_event->event.channel_id = channel->id;
if (is_reliable) {
/* Release assembler if reliable */
host_msg_assembler_release(ma);
}
}
} else {
ASSERT(false);
}
}
} else {
ASSERT(false);
}
}
} break;
default: break;
} }
} }
} }
} else {
break; if (!skip_packet) {
switch (host_packet_kind) {
case HOST_PACKET_KIND_TRY_CONNECT:
{
/* A foreign host is trying to connect to us */
if (!channel->valid) {
logf_info("Received conection attempt from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
/* TODO: Verify that some per-host uuid isn't
* present in a rolling window to prevent reconnects right after a disconnect? */
channel = host_channel_alloc(host, address);
}
} break;
case HOST_PACKET_KIND_CONNECT_SUCCESS:
{
/* We successfully connected to a foreign host and they are ready to receive messages */
if (channel->valid && !channel->connected) {
logf_info("Received connection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_OPENED;
queued_event->event.channel_id = channel->id;
channel->connected = true;
}
} break;
case HOST_PACKET_KIND_DISCONNECT:
{
/* A foreign host disconnected from us */
if (channel->valid) {
logf_info("Received disconnection from %F", FMT_STR(sock_string_from_address(scratch.arena, address)));
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
queued_event->event.kind = HOST_EVENT_KIND_CHANNEL_CLOSED;
queued_event->event.channel_id = channel->id;
host_channel_release(channel);
}
} break;
case HOST_PACKET_KIND_MSG_CHUNK:
{
if (channel->valid) {
/* Packet is chunk <chunk_id> out of <chunk_count> belonging to message <msg_id> */
u64 msg_id = br_read_var_uint(&br);
u64 chunk_id = br_read_var_uint(&br);
u64 chunk_count = br_read_var_uint(&br);
b32 is_last_chunk = (chunk_id + 1) == chunk_count;
u64 data_len = is_last_chunk ? (br_read_u8(&br) + 1) : PACKET_CHUNK_MAX_LEN;
struct host_msg_assembler *ma = host_get_msg_assembler(host, channel->id, msg_id);
if (!ma) {
ma = host_msg_assembler_alloc(channel, msg_id, chunk_count, now_ns, is_reliable);
}
if (chunk_count == ma->num_chunks_total && chunk_id < chunk_count) {
if (!host_msg_assembler_is_chunk_filled(ma, chunk_id)) {
u8 *src = br_seek(&br, data_len);
if (src) {
u8 *dst = &ma->data[chunk_id * PACKET_CHUNK_MAX_LEN];
MEMCPY(dst, src, data_len);
if (is_last_chunk) {
ma->last_chunk_len = data_len;
}
host_msg_assembler_set_chunk_received(ma, chunk_id);
++ma->num_chunks_received;
host_msg_assembler_touch(ma, now_ns);
if (ma->num_chunks_received == chunk_count) {
/* All chunks filled, message has finished assembling */
/* TODO: Message ordering */
struct host_queued_event *queued_event = host_queued_event_alloc_and_append(host);
struct string data = ZI;
data.len = ((chunk_count - 1) * PACKET_CHUNK_MAX_LEN) + ma->last_chunk_len;
data.text = arena_push_array(&host->queued_event_arena, u8, data.len);
MEMCPY(data.text, ma->data, data.len);
queued_event->event.kind = HOST_EVENT_KIND_MSG;
queued_event->event.msg = data;
queued_event->event.channel_id = channel->id;
if (is_reliable) {
/* Release assembler if reliable */
host_msg_assembler_release(ma);
}
}
} else {
ASSERT(false);
}
}
} else {
ASSERT(false);
}
}
} break;
default: break;
}
}
} }
} }
/* Reset read buffer */
recv_buffer->first_packet = NULL;
recv_buffer->last_packet = NULL;
arena_reset(&recv_buffer->arena);
/* Update channels */ /* Update channels */
for (u64 i = 0; i < host->num_channels_reserved; ++i) { for (u64 i = 0; i < host->num_channels_reserved; ++i) {
@ -779,9 +813,6 @@ void host_update(struct host *host)
if ((now_ns - ma->touched_ns) > timeout_ns) { if ((now_ns - ma->touched_ns) > timeout_ns) {
if (!ma->is_reliable) { if (!ma->is_reliable) {
host_msg_assembler_release(ma); host_msg_assembler_release(ma);
if (ma->num_chunks_received != ma->num_chunks_total) {
DEBUGBREAKABLE;
}
} }
} else { } else {
break; break;
@ -888,15 +919,16 @@ void host_update(struct host *host)
/* Process packets */ /* Process packets */
/* TODO: Aggregate small packets */ /* TODO: Aggregate small packets */
for (u64 i = 0; i < host->num_channels_reserved; ++i) { for (u64 i = 0; i < host->num_channels_reserved; ++i) {
struct sock *sock = host->sock;
struct host_channel *channel = &host->channels[i]; struct host_channel *channel = &host->channels[i];
if (channel->valid) { if (channel->valid) {
struct sock_address address = channel->address; struct sock_address address = channel->address;
/* Send unreliable packets to channel */ /* Send reliable packets to channel */
for (struct host_packet *host_packet = channel->first_unreliable_packet; host_packet; host_packet = host_packet->next) { for (struct host_packet *host_packet = channel->first_reliable_packet; host_packet; host_packet = host_packet->next) {
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data)); sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
} }
/* Send un-acked reliable packets to channel */ /* Send unreliable packets to channel */
for (struct host_packet *host_packet = channel->first_reliable_packet; host_packet; host_packet = host_packet->next) { for (struct host_packet *host_packet = channel->first_unreliable_packet; host_packet; host_packet = host_packet->next) {
sock_write(sock, address, STRING(host_packet->data_len, host_packet->data)); sock_write(sock, address, STRING(host_packet->data_len, host_packet->data));
} }
/* Release unreliable packets */ /* Release unreliable packets */
@ -950,3 +982,48 @@ struct host_event_array host_pop_events(struct arena *arena, struct host *host)
return res; return res;
} }
/* ========================== *
* Receive thread
* ========================== */
INTERNAL SYS_THREAD_ENTRY_POINT_FUNC_DEF(host_receiver_thread_entry_point, arg)
{
u64 read_buff_size = KILOBYTE(64);
struct arena read_buff_arena = arena_alloc(read_buff_size);
struct string read_buff = ZI;
read_buff.len = read_buff_size;
read_buff.text = arena_push_array(&read_buff_arena, u8, KILOBYTE(64));
struct host *host = (struct host *)arg;
struct sock *sock = host->sock;
struct sock_array socks = ZI;
socks.socks = &sock;
socks.count = 1;
struct sock_read_result res;
volatile b32 run = true;
while (run) {
res = sock_read_poll(socks, NULL, read_buff, F32_INFINITY);
if (res.valid) {
struct sys_lock lock = sys_mutex_lock_e(&host->recv_buffer_write_mutex);
{
struct host_recv_buffer *recv_buffer = host->recv_buffer_write;
struct host_recv_packet *packet = arena_push_zero(&recv_buffer->arena, struct host_recv_packet);
packet->sock = res.sock;
packet->address = res.address;
packet->data = string_copy(&recv_buffer->arena, res.data);
if (recv_buffer->last_packet) {
recv_buffer->last_packet->next = packet;
} else {
recv_buffer->first_packet = packet;
}
recv_buffer->last_packet = packet;
}
sys_mutex_unlock(&lock);
}
}
arena_release(&read_buff_arena);
}

View File

@ -1,6 +1,7 @@
#ifndef HOST_H #ifndef HOST_H
#define HOST_H #define HOST_H
#include "sys.h"
#include "sock.h" #include "sock.h"
#define HOST_CHANNEL_ID_NIL (struct host_channel_id) { .gen = 0, .idx = 0 } #define HOST_CHANNEL_ID_NIL (struct host_channel_id) { .gen = 0, .idx = 0 }
@ -8,6 +9,7 @@
struct host_packet; struct host_packet;
struct host_channel_lookup_bucket; struct host_channel_lookup_bucket;
struct host_recv_buffer;
enum host_cmd_kind { enum host_cmd_kind {
HOST_CMD_KIND_NONE, HOST_CMD_KIND_NONE,
@ -49,7 +51,7 @@ struct host_event_array {
struct host { struct host {
struct arena arena; struct arena arena;
struct sock sock; struct sock *sock;
struct arena cmd_arena; struct arena cmd_arena;
struct host_cmd *first_cmd; struct host_cmd *first_cmd;
@ -74,6 +76,13 @@ struct host {
struct host_msg_assembler_lookup_bucket *msg_assembler_lookup_buckets; /* Allocated in `arena` */ struct host_msg_assembler_lookup_bucket *msg_assembler_lookup_buckets; /* Allocated in `arena` */
u64 num_msg_assembler_lookup_buckets; u64 num_msg_assembler_lookup_buckets;
/* Double buffer for incoming data */
struct sys_mutex recv_buffer_write_mutex;
struct host_recv_buffer *recv_buffer_read;
struct host_recv_buffer *recv_buffer_write;
struct sys_thread receiver_thread;
}; };
/* ========================== * /* ========================== *

View File

@ -3,24 +3,24 @@
#include "memory.h" #include "memory.h"
#if 1
enum sock_flag {
SOCK_FLAG_NONE = 0,
SOCK_FLAG_NON_BLOCKING = (1 << 0)
};
struct sock {
u64 handle;
};
enum sock_address_family { enum sock_address_family {
SOCK_ADDRESS_FAMILY_IPV4, SOCK_ADDRESS_FAMILY_IPV4,
SOCK_ADDRESS_FAMILY_IPV6 SOCK_ADDRESS_FAMILY_IPV6
}; };
struct sock {
i32 _;
};
struct sock_signal {
i32 _;
};
struct sock_array {
struct sock **socks;
u64 count;
};
struct sock_address { struct sock_address {
b32 valid; b32 valid;
enum sock_address_family family; enum sock_address_family family;
@ -31,6 +31,7 @@ struct sock_address {
struct sock_read_result { struct sock_read_result {
b32 valid; b32 valid;
struct sock *sock; /* In case of read from multiple sockets */
struct sock_address address; struct sock_address address;
struct string data; struct string data;
}; };
@ -41,89 +42,16 @@ struct sock_startup_receipt sock_startup(void);
struct sock_address sock_address_from_string(struct string str); struct sock_address sock_address_from_string(struct string str);
struct sock_address sock_address_from_port(u16 port); struct sock_address sock_address_from_port(u16 port);
struct string sock_string_from_address(struct arena *arena, struct sock_address address); struct string sock_string_from_address(struct arena *arena, struct sock_address address);
struct sock sock_alloc(u16 listen_port, u32 flags);
void sock_release(struct sock *sock);
struct sock_read_result sock_read(struct sock *sock, struct string read_buff);
void sock_write(struct sock *sock, struct sock_address address, struct string data);
INLINE b32 sock_address_eq(struct sock_address a, struct sock_address b) INLINE b32 sock_address_eq(struct sock_address a, struct sock_address b)
{ {
return MEMEQ_STRUCT(&a, &b); return MEMEQ_STRUCT(&a, &b);
} }
struct sock *sock_alloc(u16 listen_port);
#else
#define SOCK_IP_ANY_INTERFACE CPPCOMPAT_INITLIST_TYPE(struct sock_ip) { 0 }
#define SOCK_PORT_DYNAMIC (0)
struct sock_ip {
u8 b[16];
};
struct sock_address {
struct sock_ip ip;
u32 port;
};
struct sock {
u64 handle;
};
enum sock_flag {
SOCK_FLAG_NONE = (1 << 0),
SOCK_FLAG_NON_BLOCKING_RECV = (1 << 1)
};
/* ========================== *
* Startup
* ========================== */
struct sock_startup_receipt { i32 _; };
struct sock_startup_receipt sock_startup(void);
/* ========================== *
* Address
* ========================== */
b32 sock_ip_is_ipv4(struct sock_ip ip);
b32 sock_string_is_ipv4(struct string s);
struct sock_ip sock_ip_from_string(struct string s);
struct string sock_ip_to_string(struct arena *arena, struct sock_ip addr);
/* ========================== *
* Sock
* ========================== */
struct sock sock_alloc(struct sock_address addr, sock_receive_callback_func *receive_callback, u64 flags);
void sock_release(struct sock *sock); void sock_release(struct sock *sock);
void sock_write(struct sock *sock, struct sock_address address, struct string msg);
struct sock_read_result sock_read(struct sock *sock, struct string read_buff);
struct sock_read_result sock_read_poll(struct sock_array socks, struct sock_signal *signal, struct string read_buff, f32 timeout);
void sock_write(struct sock *sock, struct sock_address address, struct string data);
void sock_testsend(void);
void sock_testrecv(void);
#endif
#endif #endif

View File

@ -27,6 +27,10 @@ struct winsock_address {
}; };
}; };
/* ========================== *
* Startup
* ========================== */
struct sock_startup_receipt sock_startup(void) struct sock_startup_receipt sock_startup(void)
{ {
WSADATA wsa_data; WSADATA wsa_data;
@ -35,6 +39,10 @@ struct sock_startup_receipt sock_startup(void)
return (struct sock_startup_receipt) { 0 }; return (struct sock_startup_receipt) { 0 };
} }
/* ========================== *
* Address
* ========================== */
INTERNAL struct sock_address sock_address_from_ip_port_cstr(char *ip_cstr, char *port_cstr) INTERNAL struct sock_address sock_address_from_ip_port_cstr(char *ip_cstr, char *port_cstr)
{ {
struct sock_address res = ZI; struct sock_address res = ZI;
@ -230,21 +238,16 @@ INTERNAL struct sock_address sock_address_from_winsock_address(struct winsock_ad
return res; return res;
} }
struct sock sock_alloc(u16 listen_port, u32 flags) /* ========================== *
* Alloc
* ========================== */
struct sock *sock_alloc(u16 listen_port)
{ {
struct sock_address addr = sock_address_from_port(listen_port); struct sock_address addr = sock_address_from_port(listen_port);
struct winsock_address ws_addr = winsock_address_from_sock_address(addr); struct winsock_address ws_addr = winsock_address_from_sock_address(addr);
SOCKET ws = socket(ws_addr.family, SOCK_DGRAM, IPPROTO_UDP); SOCKET ws = socket(ws_addr.family, SOCK_DGRAM, IPPROTO_UDP);
#if 0
if (flags & SOCK_FLAG_NON_BLOCKING) {
u_long mode = 1;
ioctlsocket(ws, FIONBIO, &mode);
}
#else
(UNUSED)flags;
#endif
//setsockopt(ws, SOL_SOCKET, SO_REUSEADDR
#if 0 #if 0
if (listen_port != 0) { if (listen_port != 0) {
@ -254,21 +257,24 @@ struct sock sock_alloc(u16 listen_port, u32 flags)
bind(ws, &ws_addr.sa, ws_addr.size); bind(ws, &ws_addr.sa, ws_addr.size);
#endif #endif
struct sock res = ZI; return (struct sock *)ws;
res.handle = *(u64 *)&ws;
return res;
} }
void sock_release(struct sock *sock) void sock_release(struct sock *sock)
{ {
SOCKET ws = *(SOCKET *)&sock->handle; SOCKET ws = (SOCKET)sock;
closesocket(ws); closesocket(ws);
} }
/* ========================== *
* Read
* ========================== */
struct sock_read_result sock_read(struct sock *sock, struct string read_buff) struct sock_read_result sock_read(struct sock *sock, struct string read_buff)
{ {
SOCKET ws = *(SOCKET *)&sock->handle; SOCKET ws = (SOCKET)sock;
struct sock_read_result res = ZI; struct sock_read_result res = ZI;
res.sock = sock;
struct winsock_address ws_addr = ZI; struct winsock_address ws_addr = ZI;
ws_addr.size = sizeof(ws_addr.sas); ws_addr.size = sizeof(ws_addr.sas);
@ -293,11 +299,76 @@ struct sock_read_result sock_read(struct sock *sock, struct string read_buff)
return res; return res;
} }
struct sock_read_result sock_read_poll(struct sock_array socks, struct sock_signal *signal, struct string read_buff, f32 timeout)
{
struct temp_arena scratch = scratch_begin_no_conflict();
struct sock_read_result res = ZI;
u64 num_fds = socks.count + (signal != NULL);
WSAPOLLFD *fds = arena_push_array(scratch.arena, WSAPOLLFD, num_fds);
for (u64 i = 0; i < num_fds; ++i) {
if (i + 1 == num_fds && signal != NULL) {
fds[i].fd = (SOCKET)signal;
} else {
SOCKET ws = (SOCKET)socks.socks[i];
fds[i].fd = ws;
}
fds[i].events = POLLRDNORM;
}
i32 timeout_ms;
if (timeout == F32_INFINITY) {
timeout_ms = -1;
} else {
timeout_ms = (i32)(timeout * 1000);
}
WSAPoll(fds, num_fds, timeout_ms);
for (u64 i = 0; i < num_fds; ++i) {
if (fds[i].revents & POLLRDNORM) {
if (i < socks.count) {
SOCKET ws = fds[i].fd;
res.sock = socks.socks[i];
struct winsock_address ws_addr = ZI;
ws_addr.size = sizeof(ws_addr.sas);
i32 size = recvfrom(ws, (char *)read_buff.text, read_buff.len, 0, &ws_addr.sa, &ws_addr.size);
ws_addr.family = ws_addr.sin.sin_family;
res.address = sock_address_from_winsock_address(ws_addr);
if (size > 0) {
res.data.text = read_buff.text;
res.data.len = size;
res.valid = true;
break;
} else {
#if RTC
i32 err = WSAGetLastError();
if (err != WSAEWOULDBLOCK && err != WSAETIMEDOUT) {
ASSERT(false);
}
#endif
break;
}
}
}
}
scratch_end(scratch);
return res;
}
/* ========================== *
* Write
* ========================== */
void sock_write(struct sock *sock, struct sock_address address, struct string data) void sock_write(struct sock *sock, struct sock_address address, struct string data)
{ {
SOCKET ws = *(SOCKET *)&sock->handle; SOCKET ws = (SOCKET)sock;
struct winsock_address ws_addr = winsock_address_from_sock_address(address); struct winsock_address ws_addr = winsock_address_from_sock_address(address);
i32 size = sendto(ws, (char *)data.text, data.len, 0, &ws_addr.sa, ws_addr.size); i32 size = sendto(ws, (char *)data.text, data.len, 0, &ws_addr.sa, ws_addr.size);
(UNUSED)size;
#if RTC #if RTC
if (size != (i32)data.len) { if (size != (i32)data.len) {
i32 err = WSAGetLastError(); i32 err = WSAGetLastError();

View File

@ -259,7 +259,7 @@ struct sprite_startup_receipt sprite_startup(struct renderer_startup_receipt *re
G.evictor_mutex = sys_mutex_alloc(); G.evictor_mutex = sys_mutex_alloc();
G.evictor_cv = sys_condition_variable_alloc(); G.evictor_cv = sys_condition_variable_alloc();
G.evictor_thread = sys_thread_alloc(sprite_evictor_thread_entry_point, NULL, LIT("[P0] Sprite evictor")); G.evictor_thread = sys_thread_alloc(sprite_evictor_thread_entry_point, NULL, LIT("[P7] Sprite evictor"));
app_register_exit_callback(&sprite_shutdown); app_register_exit_callback(&sprite_shutdown);

View File

@ -886,7 +886,6 @@ INTERNAL void user_update(void)
G.debug_camera_panning = false; G.debug_camera_panning = false;
} }
/* Zoom view */ /* Zoom view */
i32 input_zooms = G.bind_states[USER_BIND_KIND_ZOOM_IN].num_presses - G.bind_states[USER_BIND_KIND_ZOOM_OUT].num_presses; i32 input_zooms = G.bind_states[USER_BIND_KIND_ZOOM_IN].num_presses - G.bind_states[USER_BIND_KIND_ZOOM_OUT].num_presses;
if (input_zooms != 0) { if (input_zooms != 0) {