add padded atomic types

This commit is contained in:
jacob 2025-07-10 12:47:03 -05:00
parent 915a9272e4
commit 6838c7ac02
7 changed files with 145 additions and 587 deletions

View File

@ -417,6 +417,40 @@ struct atomic_ptr {
volatile void *_v;
};
/* ========================== *
* Cache-line isolated atomics
* ========================== */
struct alignas(64) atomic_i32_padded {
struct atomic_i32 v;
u8 _pad[60];
};
STATIC_ASSERT(sizeof(struct atomic_i32_padded) == 64 && alignof(struct atomic_i32_padded) == 64);
struct alignas(64) atomic_i64_padded {
struct atomic_i64 v;
u8 _pad[56];
};
STATIC_ASSERT(sizeof(struct atomic_i64_padded) == 64 && alignof(struct atomic_i64_padded) == 64);
struct alignas(64) atomic_u32_padded {
struct atomic_u32 v;
u8 _pad[60];
};
STATIC_ASSERT(sizeof(struct atomic_u32_padded) == 64 && alignof(struct atomic_u32_padded) == 64);
struct alignas(64) atomic_u64_padded {
struct atomic_u64 v;
u8 _pad[56];
};
STATIC_ASSERT(sizeof(struct atomic_u64_padded) == 64 && alignof(struct atomic_u64_padded) == 64);
struct alignas(64) atomic_ptr_padded {
struct atomic_ptr v;
u8 _pad[56];
};
STATIC_ASSERT(sizeof(struct atomic_ptr_padded) == 64 && alignof(struct atomic_ptr_padded) == 64);
/* ========================== *
* Common structs
* ========================== */

View File

@ -8,25 +8,25 @@
#include "atomic.h"
struct _gstats {
struct atomic_u64 GSTAT_SOCK_BYTES_SENT;
struct atomic_u64 GSTAT_SOCK_BYTES_RECEIVED;
struct atomic_u64 GSTAT_MEMORY_COMMITTED;
struct atomic_u64 GSTAT_MEMORY_RESERVED;
struct atomic_u64 GSTAT_NUM_ARENAS;
struct atomic_u64_padded GSTAT_SOCK_BYTES_SENT;
struct atomic_u64_padded GSTAT_SOCK_BYTES_RECEIVED;
struct atomic_u64_padded GSTAT_MEMORY_COMMITTED;
struct atomic_u64_padded GSTAT_MEMORY_RESERVED;
struct atomic_u64_padded GSTAT_NUM_ARENAS;
};
extern struct _gstats _g_gstats;
#define gstat_set(name, v) atomic_u64_fetch_set(&_g_gstats.name, (v))
#define gstat_add(name, v) atomic_u64_fetch_add_u64(&_g_gstats.name, (v))
#define gstat_sub(name, v) atomic_u64_fetch_add_i64(&_g_gstats.name, -((i64)(v)))
#define gstat_get(name) atomic_u64_fetch(&_g_gstats.name)
#define gstat_set(name, value) atomic_u64_fetch_set(&_g_gstats.name.v, (value))
#define gstat_add(name, value) atomic_u64_fetch_add_u64(&_g_gstats.name.v, (value))
#define gstat_sub(name, value) atomic_u64_fetch_add_i64(&_g_gstats.name.v, -((i64)(value)))
#define gstat_get(name) atomic_u64_fetch(&_g_gstats.name.v)
#else
#define gstat_set(name, v)
#define gstat_add(name, v)
#define gstat_sub(name, v)
#define gstat_set(name, value)
#define gstat_add(name, value)
#define gstat_sub(name, value)
#define gstat_get(name) 0
#endif

View File

@ -205,7 +205,7 @@ struct host *host_alloc(u16 listen_port)
void host_release(struct host *host)
{
atomic_i32_fetch_set(&host->receiver_thread_shutdown_flag, 1);
atomic_i32_fetch_set(&host->receiver_thread_shutdown_flag.v, 1);
sock_wake(host->sock);
while (!sys_thread_try_release(host->receiver_thread, 0.001f)) {
sock_wake(host->sock);
@ -1072,7 +1072,7 @@ INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg)
socks.socks = &host->sock;
socks.count = 1;
struct atomic_i32 *shutdown = &host->receiver_thread_shutdown_flag;
struct atomic_i32 *shutdown = &host->receiver_thread_shutdown_flag.v;
while (!atomic_i32_fetch(shutdown)) {
struct sock *sock = sock_wait_for_available_read(socks, F32_INFINITY);
struct sock_read_result res;

View File

@ -98,7 +98,7 @@ struct host {
u64 bytes_received;
u64 bytes_sent;
struct atomic_i32 receiver_thread_shutdown_flag;
struct atomic_i32_padded receiver_thread_shutdown_flag;
struct sys_thread *receiver_thread;
};

View File

@ -56,10 +56,6 @@ void snc_cv_wait(struct snc_cv *cv, struct snc_lock *lock);
void snc_cv_wait_time(struct snc_cv *cv, struct snc_lock *l, i64 timeout_ns);
void snc_cv_broadcast(struct snc_cv *cv);
/* ========================== *
* Fence
* ========================== */
/* ========================== *
* Counter
* ========================== */
@ -71,7 +67,6 @@ struct alignas(64) snc_counter {
STATIC_ASSERT(sizeof(struct snc_counter) == 64); /* Padding validation */
STATIC_ASSERT(alignof(struct snc_counter) == 64); /* Prevent false sharing */
void snc_counter_add(struct snc_counter *counter, i64 x);
void snc_counter_wait(struct snc_counter *counter);

View File

@ -67,7 +67,7 @@ struct cache_entry {
enum cache_entry_kind kind;
struct cache_entry_hash hash;
struct atomic_i32 state;
struct atomic_u64 refcount_struct; /* Cast fetched result to `cache_refcount` */
struct atomic_u64_padded refcount_struct; /* Cast fetched result to `cache_refcount` */
/* Allocated data */
/* NOTE: This data is finalized once entry state = loaded */
@ -96,7 +96,7 @@ struct cache_bin {
};
struct cache {
struct atomic_u64 memory_usage;
struct atomic_u64_padded memory_usage;
struct arena *arena;
struct cache_bin *bins;
struct snc_mutex entry_pool_mutex;
@ -146,12 +146,12 @@ GLOBAL struct {
struct load_cmd *first_free_load_cmd;
/* Scopes */
struct atomic_i32 scopes_lock;
struct snc_mutex scopes_mutex;
struct arena *scopes_arena;
struct sprite_scope *first_free_scope;
/* Evictor */
struct atomic_i32 evictor_cycle;
struct atomic_i32_padded evictor_cycle;
struct snc_counter shutdown_counter;
b32 evictor_scheduler_shutdown;
struct snc_mutex evictor_scheduler_mutex;
@ -380,7 +380,7 @@ INTERNAL void cache_entry_load_texture(struct cache_ref ref, struct sprite_tag t
}
arena_set_readonly(e->arena);
e->memory_usage = e->arena->committed + memory_size;
atomic_u64_fetch_add_u64(&G.cache.memory_usage, e->memory_usage);
atomic_u64_fetch_add_u64(&G.cache.memory_usage.v, e->memory_usage);
if (success) {
logf_success("Loaded sprite texture [%F] \"%F\" in %F seconds (cache size: %F bytes).",
@ -701,7 +701,7 @@ INTERNAL void cache_entry_load_sheet(struct cache_ref ref, struct sprite_tag tag
}
arena_set_readonly(e->arena);
e->memory_usage = e->arena->committed;
atomic_u64_fetch_add_u64(&G.cache.memory_usage, e->memory_usage);
atomic_u64_fetch_add_u64(&G.cache.memory_usage.v, e->memory_usage);
if (success) {
logf_success("Loaded sprite sheet [%F] \"%F\" in %F seconds (cache size: %F bytes).",
@ -736,8 +736,8 @@ INTERNAL void cache_entry_load_sheet(struct cache_ref ref, struct sprite_tag tag
INTERNAL void refcount_add(struct cache_entry *e, i32 amount)
{
i32 evictor_cycle = atomic_i32_fetch(&G.evictor_cycle);
struct atomic_u64 *refcount_atomic = &e->refcount_struct;
i32 evictor_cycle = atomic_i32_fetch(&G.evictor_cycle.v);
struct atomic_u64 *refcount_atomic = &e->refcount_struct.v;
u64 old_refcount_uncast = atomic_u64_fetch(refcount_atomic);
do {
struct cache_refcount new_refcount = *(struct cache_refcount *)&old_refcount_uncast;
@ -806,7 +806,7 @@ struct sprite_scope *sprite_scope_begin(void)
struct sprite_scope_cache_ref **bins = NULL;
struct sprite_scope_cache_ref *pool = NULL;
{
while (atomic_i32_fetch_test_set(&G.scopes_lock, 0, 1) != 0) ix_pause();
struct snc_lock lock = snc_lock_e(&G.scopes_mutex);
{
if (G.first_free_scope) {
res = G.first_free_scope;
@ -819,7 +819,7 @@ struct sprite_scope *sprite_scope_begin(void)
pool = arena_push_array_no_zero(G.scopes_arena, struct sprite_scope_cache_ref, MAX_SCOPE_REFERENCES);
}
}
atomic_i32_fetch_set(&G.scopes_lock, 0);
snc_unlock(&lock);
}
MEMZERO_STRUCT(res);
MEMZERO(bins, sizeof(*bins) * CACHE_BINS_COUNT);
@ -838,12 +838,12 @@ void sprite_scope_end(struct sprite_scope *scope)
}
/* Release scope */
while (atomic_i32_fetch_test_set(&G.scopes_lock, 0, 1) != 0) ix_pause();
struct snc_lock lock = snc_lock_e(&G.scopes_mutex);
{
scope->next_free = G.first_free_scope;
G.first_free_scope = scope;
}
atomic_i32_fetch_set(&G.scopes_lock, 0);
snc_unlock(&lock);
}
/* ========================== *
@ -1243,10 +1243,10 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
u64 evict_array_count = 0;
struct evict_node *evict_array = arena_push_dry(scratch.arena, struct evict_node);
{
i32 cur_cycle = atomic_i32_fetch(&G.evictor_cycle);
i32 cur_cycle = atomic_i32_fetch(&G.evictor_cycle.v);
/* Scan for evictable nodes */
b32 cache_over_budget_threshold = atomic_u64_fetch(&G.cache.memory_usage) > CACHE_MEMORY_BUDGET_THRESHOLD;
b32 cache_over_budget_threshold = atomic_u64_fetch(&G.cache.memory_usage.v) > CACHE_MEMORY_BUDGET_THRESHOLD;
if (cache_over_budget_threshold || RESOURCE_RELOADING) {
__profn("Evictor scan");
for (u64 i = 0; i < CACHE_BINS_COUNT; ++i) {
@ -1255,7 +1255,7 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
{
struct cache_entry *n = bin->first;
while (n) {
u64 refcount_uncast = atomic_u64_fetch(&n->refcount_struct);
u64 refcount_uncast = atomic_u64_fetch(&n->refcount_struct.v);
struct cache_refcount refcount = *(struct cache_refcount *)&refcount_uncast;
if (refcount.count <= 0) {
/* Add node to evict list */
@ -1303,10 +1303,10 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
struct cache_bin *bin = en->cache_bin;
struct cache_entry *entry = en->cache_entry;
i32 last_ref_cycle = en->last_ref_cycle;
b32 cache_over_budget_target = atomic_u64_fetch(&G.cache.memory_usage) > CACHE_MEMORY_BUDGET_TARGET;
b32 cache_over_budget_target = atomic_u64_fetch(&G.cache.memory_usage.v) > CACHE_MEMORY_BUDGET_TARGET;
struct snc_lock bin_lock = snc_lock_e(&bin->mutex);
{
u64 refcount_uncast = atomic_u64_fetch(&entry->refcount_struct);
u64 refcount_uncast = atomic_u64_fetch(&entry->refcount_struct.v);
struct cache_refcount refcount = *(struct cache_refcount *)&refcount_uncast;
if (refcount.count > 0 || (last_ref_cycle >= 0 && refcount.last_ref_cycle != en->last_ref_cycle)) {
/* Cache node has been referenced since scan, skip node. */
@ -1325,7 +1325,7 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
bin->last = prev;
}
atomic_u64_fetch_add_i64(&G.cache.memory_usage, -((i64)entry->memory_usage));
atomic_u64_fetch_add_i64(&G.cache.memory_usage.v, -((i64)entry->memory_usage));
/* Add to evicted list */
en->next_evicted = first_evicted;
@ -1365,7 +1365,7 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
}
}
}
atomic_i32_fetch_add(&G.evictor_cycle, 1);
atomic_i32_fetch_add(&G.evictor_cycle.v, 1);
scratch_end(scratch);
}

View File

@ -34,15 +34,13 @@
#define THREAD_STACK_SIZE KIBI(64)
#define FIBER_STACK_SIZE MEBI(4)
struct win32_mutex {
struct atomic_i32 v; /* Bits 0-30 = shared lock count, bit 30 = is exclusive lock pending, bit 31 = is exclusive locked */
struct win32_mutex *next_free;
};
/* Assume scheduler cycle is 20hz at start to be conservative */
#define DEFAULT_SCHEDULER_CYCLE_PERIOD_NS 50000000
#define NUM_ROLLING_SCHEDULER_PERIODS 1000
struct win32_condition_variable {
struct atomic_u64 wake_gen;
struct win32_condition_variable *next_free;
};
#define FIBER_NAME_PREFIX_CSTR "Fiber ["
#define FIBER_NAME_SUFFIX_CSTR "]"
#define FIBER_NAME_MAX_SIZE 64
struct win32_thread {
sys_thread_func *entry_point;
@ -138,16 +136,6 @@ struct alignas(64) wait_bin {
STATIC_ASSERT(sizeof(struct wait_bin) == 64); /* Padding validation (increase if necessary) */
STATIC_ASSERT(alignof(struct wait_bin) == 64); /* Avoid false sharing */
/* Assume scheduler cycle is 20hz at start to be conservative */
#define DEFAULT_SCHEDULER_CYCLE_PERIOD_NS 50000000
#define NUM_ROLLING_SCHEDULER_PERIODS 1000
#define FIBER_NAME_PREFIX_CSTR "Fiber ["
#define FIBER_NAME_SUFFIX_CSTR "]"
#define FIBER_NAME_MAX_SIZE 64
enum yield_kind {
YIELD_KIND_NONE,
YIELD_KIND_DONE,
@ -243,10 +231,38 @@ enum job_queue_kind {
NUM_JOB_QUEUE_KINDS
};
/* ========================== *
* Ticket mutex
* ========================== */
struct tm {
struct atomic_i64_padded ticket;
struct atomic_i64_padded serving;
};
INTERNAL void tm_lock(struct tm *tm)
{
i64 ticket = atomic_i64_fetch_add(&tm->ticket.v, 1);
while (atomic_i64_fetch(&tm->serving.v) != ticket) {
ix_pause();
}
}
INTERNAL void tm_unlock(struct tm *tm)
{
atomic_i64_fetch_add(&tm->serving.v, 1);
}
struct alignas(64) job_queue {
enum job_queue_kind kind;
struct atomic_i32 lock;
struct tm lock;
struct arena *arena;
struct job_info *first;
@ -298,12 +314,12 @@ GLOBAL struct {
/* Scheduler */
struct atomic_i64 current_scheduler_cycle; /* TODO: Prevent false sharing */
struct atomic_i64 current_scheduler_cycle_period_ns; /* TODO: Prevent false sharing */
struct atomic_i64_padded current_scheduler_cycle;
struct atomic_i64_padded current_scheduler_cycle_period_ns;
/* Wait lists */
struct atomic_u64 waiter_wake_gen; /* TODO: Prevent false sharing */
struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */
struct atomic_u64_padded waiter_wake_gen;
struct tm wait_lists_arena_lock;
struct arena *wait_lists_arena;
/* Wait tables */
@ -314,15 +330,15 @@ GLOBAL struct {
i16 num_fibers;
i16 first_free_fiber_id;
struct arena *fiber_names_arena;
struct atomic_i32 fibers_lock; /* TODO: Prevent false sharing */
struct tm fibers_lock;
struct fiber fibers[SYS_MAX_FIBERS];
/* Jobs */
struct job_queue job_queues[NUM_JOB_QUEUE_KINDS];
/* Workers */
struct atomic_i32 workers_shutdown;
struct atomic_i64 num_jobs_in_queue; /* TODO: Prevent false sharing */
struct atomic_i32_padded workers_shutdown;
struct atomic_i64_padded num_jobs_in_queue;
struct snc_mutex workers_wake_mutex;
struct snc_cv workers_wake_cv;
@ -352,7 +368,7 @@ INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queu
i64 sys_current_scheduler_period_ns(void)
{
return atomic_i64_fetch(&G.current_scheduler_cycle_period_ns);
return atomic_i64_fetch(&G.current_scheduler_cycle_period_ns.v);
}
@ -424,7 +440,7 @@ void sys_wake_all(void *addr)
{
struct arena_temp scratch = scratch_begin_no_conflict();
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1);
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1);
u64 wait_addr_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
@ -556,7 +572,7 @@ void sys_wake_all(void *addr)
struct fiber *waiter = waiters[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
struct job_queue *queue = &G.job_queues[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
tm_lock(&queue->lock);
{
struct job_info *info = NULL;
if (queue->first_free) {
@ -580,7 +596,7 @@ void sys_wake_all(void *addr)
queue->last = info;
atomic_u64_fetch_set(&waiter->wake_gen, 0);
}
atomic_i32_fetch_set(&queue->lock, 0);
tm_unlock(&queue->lock);
}
/* Wake blocking waiters */
@ -591,7 +607,7 @@ void sys_wake_all(void *addr)
if (num_waiters > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters);
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters);
snc_cv_broadcast(&G.workers_wake_cv);
}
snc_unlock(&lock);
@ -617,7 +633,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind)
struct fiber *fiber = NULL;
char *new_name_cstr = NULL;
{
while (atomic_i32_fetch_test_set(&G.fibers_lock, 0, 1) != 0) ix_pause();
tm_lock(&G.fibers_lock);
{
fiber_id = G.first_free_fiber_id;
if (fiber_id && kind == FIBER_KIND_JOB_WORKER) {
@ -632,7 +648,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind)
new_name_cstr = arena_push_array(G.fiber_names_arena, char, FIBER_NAME_MAX_SIZE);
}
}
atomic_i32_fetch_set(&G.fibers_lock, 0);
tm_unlock(&G.fibers_lock);
}
if (new_name_cstr != NULL) {
fiber->id = fiber_id;
@ -694,12 +710,12 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind)
INTERNAL void fiber_release(struct fiber *fiber, i16 fiber_id)
{
while (atomic_i32_fetch_test_set(&G.fibers_lock, 0, 1) != 0) ix_pause();
tm_lock(&G.fibers_lock);
{
fiber->parent_id = G.first_free_fiber_id;
G.first_free_fiber_id = fiber_id;
}
atomic_i32_fetch_set(&G.fibers_lock, 0);
tm_unlock(&G.fibers_lock);
}
FORCE_INLINE struct fiber *fiber_from_id(i16 id)
@ -730,7 +746,7 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit
priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */
enum job_queue_kind queue_kind = job_queue_kind_from_priority(priority);
struct job_queue *queue = &G.job_queues[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
tm_lock(&queue->lock);
{
struct job_info *info = NULL;
if (queue->first_free) {
@ -751,13 +767,13 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit
}
queue->last = info;
}
atomic_i32_fetch_set(&queue->lock, 0);
tm_unlock(&queue->lock);
}
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue, count);
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, count);
snc_cv_broadcast(&G.workers_wake_cv);
}
snc_unlock(&lock);
@ -881,7 +897,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) {
struct job_queue *queue = queues[queue_index];
if (queue) {
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
tm_lock(&queue->lock);
{
struct job_info *info = queue->first;
job_priority = (enum sys_priority)queue->kind;
@ -893,7 +909,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
job_id = info->num_dispatched++;
if (job_id < info->count) {
/* Pick job */
atomic_i64_fetch_add(&G.num_jobs_in_queue, -1);
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1);
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
@ -904,7 +920,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
}
} else {
/* This job is to be resumed from a yield */
atomic_i64_fetch_add(&G.num_jobs_in_queue, -1);
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1);
job_fiber_id = info->fiber_id;
job_id = info->num_dispatched;
job_func = info->func;
@ -923,7 +939,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
info = next;
}
}
atomic_i32_fetch_set(&queue->lock, 0);
tm_unlock(&queue->lock);
}
}
}
@ -973,8 +989,8 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
i64 wait_timeout_ns = yield.wait.timeout_ns;
i64 wait_time = 0;
if (wait_timeout_ns > 0 && wait_timeout_ns < I64_MAX) {
u64 current_scheduler_cycle = atomic_i64_fetch(&G.current_scheduler_cycle);
i64 current_scheduler_cycle_period_ns = atomic_i64_fetch(&G.current_scheduler_cycle_period_ns);
u64 current_scheduler_cycle = atomic_i64_fetch(&G.current_scheduler_cycle.v);
i64 current_scheduler_cycle_period_ns = atomic_i64_fetch(&G.current_scheduler_cycle_period_ns.v);
wait_time = current_scheduler_cycle + max_i64((i64)((f64)wait_timeout_ns / (f64)current_scheduler_cycle_period_ns), 1);
}
@ -998,7 +1014,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
}
}
if (wait_time != 0 && !cancel_wait) {
cancel_wait = wait_time <= atomic_i64_fetch(&G.current_scheduler_cycle);
cancel_wait = wait_time <= atomic_i64_fetch(&G.current_scheduler_cycle.v);
}
if (!cancel_wait) {
if (wait_addr != 0) {
@ -1015,11 +1031,11 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
wait_addr_list = wait_addr_bin->first_free_wait_list;
wait_addr_bin->first_free_wait_list = wait_addr_list->next_in_bin;
} else {
while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause();
tm_lock(&G.wait_lists_arena_lock);
{
wait_addr_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list);
}
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
tm_unlock(&G.wait_lists_arena_lock);
}
MEMZERO_STRUCT(wait_addr_list);
wait_addr_list->value = wait_addr;
@ -1056,11 +1072,11 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
wait_time_list = wait_time_bin->first_free_wait_list;
wait_time_bin->first_free_wait_list = wait_time_list->next_in_bin;
} else {
while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause();
tm_lock(&G.wait_lists_arena_lock);
{
wait_time_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list);
}
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
tm_unlock(&G.wait_lists_arena_lock);
}
MEMZERO_STRUCT(wait_time_list);
wait_time_list->value = wait_time;
@ -1109,11 +1125,11 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
/* Wait */
struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex);
{
shutdown = atomic_i32_fetch(&G.workers_shutdown);
while (atomic_i64_fetch(&G.num_jobs_in_queue) <= 0 && !shutdown) {
shutdown = atomic_i32_fetch(&G.workers_shutdown.v);
while (atomic_i64_fetch(&G.num_jobs_in_queue.v) <= 0 && !shutdown) {
__profnc("Wait for job", RGB32_F(0.75, 0.75, 0));
snc_cv_wait(&G.workers_wake_cv, &wake_lock);
shutdown = atomic_i32_fetch(&G.workers_shutdown);
shutdown = atomic_i32_fetch(&G.workers_shutdown.v);
}
}
snc_unlock(&wake_lock);
@ -1126,492 +1142,6 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
{
#if 0
timeBeginPeriod(1);
(UNUSED)_;
{
i32 priority = THREAD_PRIORITY_TIME_CRITICAL;
b32 success = SetThreadPriority(GetCurrentThread(), priority);
(UNUSED)success;
ASSERT(success);
}
struct arena_temp scratch = scratch_begin_no_conflict();
/* Create ring buffer of scheduler cycles initialized to default value */
i32 periods_index = 0;
i64 periods[NUM_ROLLING_SCHEDULER_PERIODS] = ZI;
for (i32 i = 0; i < (i32)countof(periods); ++i) {
periods[i] = DEFAULT_SCHEDULER_CYCLE_PERIOD_NS;
}
i64 last_cycle_ns = 0;
while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
__profn("Job scheduler cycle");
{
__profn("Job scheduler sleep");
Sleep(1);
}
i64 now_ns = sys_time_ns();
if (last_cycle_ns != 0) {
i64 new_period_ns = now_ns - last_cycle_ns;
periods[periods_index++] = new_period_ns;
if (periods_index == countof(periods)) {
periods_index = 0;
}
/* Calculate mean period */
f64 periods_sum_ns = 0;
for (i32 i = 0; i < (i32)countof(periods); ++i) {
periods_sum_ns += (f64)periods[i];
}
f64 mean_ns = periods_sum_ns / (f64)countof(periods);
atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, math_round_to_int64(mean_ns));
}
last_cycle_ns = now_ns;
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1);
i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle, 1) + 1;
{
__profn("Job scheduler run");
struct arena_temp temp = arena_temp_begin(scratch.arena);
u64 wait_time_bin_index = (u64)current_cycle % NUM_WAIT_TIME_BINS;
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
struct wait_list *wait_time_list = NULL;
/* Build list of waiters to resume */
i32 num_waiters = 0;
struct fiber **waiters = NULL;
{
while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
{
/* Search for wait time list */
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)current_cycle) {
wait_time_list = tmp;
}
}
if (wait_time_list) {
/* Set waiter wake status & build waiters list */
waiters = arena_push_array_no_zero(temp.arena, struct fiber *, wait_time_list->num_waiters);
for (struct fiber *waiter = fiber_from_id(wait_time_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_time_waiter)) {
if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) {
waiters[num_waiters] = waiter;
++num_waiters;
}
}
}
}
atomic_i32_fetch_set(&wait_time_bin->lock, 0);
}
/* Update wait lists */
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
u64 wait_addr = waiter->wait_addr;
u64 wait_addr_bin_index = wait_addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
if (wait_addr != 0) while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause();
{
/* Search for wait addr list */
struct wait_list *wait_addr_list = NULL;
if (wait_addr != 0) {
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)wait_addr) {
wait_addr_list = tmp;
}
}
}
while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
{
/* Remove from addr list */
if (wait_addr_list) {
if (--wait_addr_list->num_waiters == 0) {
/* Free addr list */
struct wait_list *prev = wait_addr_list->prev_in_bin;
struct wait_list *next = wait_addr_list->next_in_bin;
if (prev) {
prev->next_in_bin = next;
} else {
wait_addr_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
} else {
wait_addr_bin->last_wait_list = prev;
}
wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list;
wait_addr_bin->first_free_wait_list = wait_addr_list;
} else {
i16 prev_id = waiter->prev_addr_waiter;
i16 next_id = waiter->next_addr_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_addr_waiter = next_id;
} else {
wait_addr_list->first_waiter = next_id;
}
if (next_id) {
fiber_from_id(next_id)->prev_addr_waiter = prev_id;
} else {
wait_addr_list->last_waiter = prev_id;
}
}
waiter->wait_addr = 0;
waiter->prev_addr_waiter = 0;
waiter->next_addr_waiter = 0;
}
/* Remove from time list */
{
if (--wait_time_list->num_waiters == 0) {
/* Free time list */
struct wait_list *prev = wait_time_list->prev_in_bin;
struct wait_list *next = wait_time_list->next_in_bin;
if (prev) {
prev->next_in_bin = next;
} else {
wait_time_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
} else {
wait_time_bin->last_wait_list = prev;
}
wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list;
wait_time_bin->first_free_wait_list = wait_time_list;
} else {
i16 prev_id = waiter->prev_time_waiter;
i16 next_id = waiter->next_time_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_time_waiter = next_id;
} else {
wait_time_list->first_waiter = next_id;
}
if (next_id) {
fiber_from_id(next_id)->prev_time_waiter = prev_id;
} else {
wait_time_list->last_waiter = prev_id;
}
}
waiter->wait_time = 0;
waiter->prev_time_waiter = 0;
waiter->next_time_waiter = 0;
}
}
atomic_i32_fetch_set(&wait_time_bin->lock, 0);
}
if (wait_addr != 0) atomic_i32_fetch_set(&wait_addr_bin->lock, 0);
}
/* Resume waiters */
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
struct job_queue *queue = &G.job_queues[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
{
struct job_info *info = NULL;
if (queue->first_free) {
info = queue->first_free;
queue->first_free = info->next;
} else {
info = arena_push_no_zero(queue->arena, struct job_info);
}
MEMZERO_STRUCT(info);
info->count = 1;
info->num_dispatched = waiter->job_id;
info->func = waiter->job_func;
info->sig = waiter->job_sig;
info->counter = waiter->job_counter;
info->fiber_id = waiter->id;
if (queue->last) {
queue->last->next = info;
} else {
queue->first = info;
}
queue->last = info;
atomic_u64_fetch_set(&waiter->wake_gen, 0);
}
atomic_i32_fetch_set(&queue->lock, 0);
}
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
if (num_waiters > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters);
if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
atomic_i64_fetch_add(&G.workers_wake_gen, 1);
snc_cv_broadcast(&G.workers_wake_cv);
}
}
snc_unlock(&lock);
}
arena_temp_end(temp);
}
}
scratch_end(scratch);
#elif 0
(UNUSED)_;
{
i32 priority = THREAD_PRIORITY_TIME_CRITICAL;
b32 success = SetThreadPriority(GetCurrentThread(), priority);
(UNUSED)success;
ASSERT(success);
}
struct arena_temp scratch = scratch_begin_no_conflict();
/* Create high resolution timer */
HANDLE timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
if (!timer) {
sys_panic(LIT("Failed to create high resolution timer"));
}
/* Set high resolution timer period */
{
LARGE_INTEGER due = ZI;
due.QuadPart = -1;
//i32 interval = 5;
//i32 interval = 1;
i32 interval = 1;
b32 success = false;
while (!success && interval <= 50) {
success = SetWaitableTimerEx(timer, &due, interval, NULL, NULL, NULL, 0);
++interval;
}
if (!success) {
sys_panic(LIT("Failed to set scheduler timing period"));
}
}
/* Create ring buffer of scheduler cycles initialized to default value */
i32 periods_index = 0;
i64 periods[NUM_ROLLING_SCHEDULER_PERIODS] = ZI;
for (i32 i = 0; i < (i32)countof(periods); ++i) {
periods[i] = DEFAULT_SCHEDULER_CYCLE_PERIOD_NS;
}
i64 last_cycle_ns = 0;
while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
__profn("Job scheduler cycle");
{
__profn("Job scheduler wait");
WaitForSingleObject(timer, INFINITE);
}
i64 now_ns = sys_time_ns();
if (last_cycle_ns != 0) {
i64 new_period_ns = now_ns - last_cycle_ns;
periods[periods_index++] = new_period_ns;
if (periods_index == countof(periods)) {
periods_index = 0;
}
/* Calculate mean period */
f64 periods_sum_ns = 0;
for (i32 i = 0; i < (i32)countof(periods); ++i) {
periods_sum_ns += (f64)periods[i];
}
f64 mean_ns = periods_sum_ns / (f64)countof(periods);
atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, math_round_to_int64(mean_ns));
}
last_cycle_ns = now_ns;
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1);
i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle, 1) + 1;
{
__profn("Job scheduler run");
struct arena_temp temp = arena_temp_begin(scratch.arena);
u64 wait_time_bin_index = (u64)current_cycle % NUM_WAIT_TIME_BINS;
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
struct wait_list *wait_time_list = NULL;
/* Build list of waiters to resume */
i32 num_waiters = 0;
struct fiber **waiters = NULL;
{
while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
{
/* Search for wait time list */
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)current_cycle) {
wait_time_list = tmp;
}
}
if (wait_time_list) {
/* Set waiter wake status & build waiters list */
waiters = arena_push_array_no_zero(temp.arena, struct fiber *, wait_time_list->num_waiters);
for (struct fiber *waiter = fiber_from_id(wait_time_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_time_waiter)) {
if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) {
waiters[num_waiters] = waiter;
++num_waiters;
}
}
}
}
atomic_i32_fetch_set(&wait_time_bin->lock, 0);
}
/* Update wait lists */
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
u64 wait_addr = waiter->wait_addr;
u64 wait_addr_bin_index = wait_addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
if (wait_addr != 0) while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause();
{
/* Search for wait addr list */
struct wait_list *wait_addr_list = NULL;
if (wait_addr != 0) {
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)wait_addr) {
wait_addr_list = tmp;
}
}
}
while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
{
/* Remove from addr list */
if (wait_addr_list) {
if (--wait_addr_list->num_waiters == 0) {
/* Free addr list */
struct wait_list *prev = wait_addr_list->prev_in_bin;
struct wait_list *next = wait_addr_list->next_in_bin;
if (prev) {
prev->next_in_bin = next;
} else {
wait_addr_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
} else {
wait_addr_bin->last_wait_list = prev;
}
wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list;
wait_addr_bin->first_free_wait_list = wait_addr_list;
} else {
i16 prev_id = waiter->prev_addr_waiter;
i16 next_id = waiter->next_addr_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_addr_waiter = next_id;
} else {
wait_addr_list->first_waiter = next_id;
}
if (next_id) {
fiber_from_id(next_id)->prev_addr_waiter = prev_id;
} else {
wait_addr_list->last_waiter = prev_id;
}
}
waiter->wait_addr = 0;
waiter->prev_addr_waiter = 0;
waiter->next_addr_waiter = 0;
}
/* Remove from time list */
{
if (--wait_time_list->num_waiters == 0) {
/* Free time list */
struct wait_list *prev = wait_time_list->prev_in_bin;
struct wait_list *next = wait_time_list->next_in_bin;
if (prev) {
prev->next_in_bin = next;
} else {
wait_time_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
} else {
wait_time_bin->last_wait_list = prev;
}
wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list;
wait_time_bin->first_free_wait_list = wait_time_list;
} else {
i16 prev_id = waiter->prev_time_waiter;
i16 next_id = waiter->next_time_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_time_waiter = next_id;
} else {
wait_time_list->first_waiter = next_id;
}
if (next_id) {
fiber_from_id(next_id)->prev_time_waiter = prev_id;
} else {
wait_time_list->last_waiter = prev_id;
}
}
waiter->wait_time = 0;
waiter->prev_time_waiter = 0;
waiter->next_time_waiter = 0;
}
}
atomic_i32_fetch_set(&wait_time_bin->lock, 0);
}
if (wait_addr != 0) atomic_i32_fetch_set(&wait_addr_bin->lock, 0);
}
/* Resume waiters */
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
struct job_queue *queue = &G.job_queues[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
{
struct job_info *info = NULL;
if (queue->first_free) {
info = queue->first_free;
queue->first_free = info->next;
} else {
info = arena_push_no_zero(queue->arena, struct job_info);
}
MEMZERO_STRUCT(info);
info->count = 1;
info->num_dispatched = waiter->job_id;
info->func = waiter->job_func;
info->sig = waiter->job_sig;
info->counter = waiter->job_counter;
info->fiber_id = waiter->id;
if (queue->last) {
queue->last->next = info;
} else {
queue->first = info;
}
queue->last = info;
atomic_u64_fetch_set(&waiter->wake_gen, 0);
}
atomic_i32_fetch_set(&queue->lock, 0);
}
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
if (num_waiters > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters);
if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
atomic_i64_fetch_add(&G.workers_wake_gen, 1);
snc_cv_broadcast(&G.workers_wake_cv);
}
}
snc_unlock(&lock);
}
arena_temp_end(temp);
}
}
scratch_end(scratch);
#else
(UNUSED)_;
{
i32 priority = THREAD_PRIORITY_TIME_CRITICAL;
@ -1639,7 +1169,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
}
i64 last_cycle_ns = 0;
while (!atomic_i32_fetch(&G.workers_shutdown)) {
while (!atomic_i32_fetch(&G.workers_shutdown.v)) {
__profn("Job scheduler cycle");
{
__profn("Job scheduler wait");
@ -1666,12 +1196,12 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
periods_sum_ns += (f64)periods[i];
}
f64 mean_ns = periods_sum_ns / (f64)countof(periods);
atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, math_round_to_int64(mean_ns));
atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns.v, math_round_to_int64(mean_ns));
}
last_cycle_ns = now_ns;
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1);
i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle, 1) + 1;
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1);
i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle.v, 1) + 1;
{
__profn("Job scheduler run");
struct arena_temp temp = arena_temp_begin(scratch.arena);
@ -1812,7 +1342,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
struct fiber *waiter = waiters[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
struct job_queue *queue = &G.job_queues[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
tm_lock(&queue->lock);
{
struct job_info *info = NULL;
if (queue->first_free) {
@ -1836,7 +1366,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
queue->last = info;
atomic_u64_fetch_set(&waiter->wake_gen, 0);
}
atomic_i32_fetch_set(&queue->lock, 0);
tm_unlock(&queue->lock);
}
/* Wake workers */
@ -1844,7 +1374,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
if (num_waiters > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters);
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters);
snc_cv_broadcast(&G.workers_wake_cv);
}
snc_unlock(&lock);
@ -1854,7 +1384,6 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
}
scratch_end(scratch);
#endif
}
/* ========================== *
@ -1867,7 +1396,7 @@ INTERNAL SYS_THREAD_DEF(test_entry, _)
(UNUSED)_;
/* Start scheduler */
atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, DEFAULT_SCHEDULER_CYCLE_PERIOD_NS);
atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns.v, DEFAULT_SCHEDULER_CYCLE_PERIOD_NS);
struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, NULL, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER);
/* Start workers */
@ -3871,7 +3400,7 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
/* Shutdown test thread */
{
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
atomic_i32_fetch_set(&G.workers_shutdown, 1);
atomic_i32_fetch_set(&G.workers_shutdown.v, 1);
snc_cv_broadcast(&G.workers_wake_cv);
snc_unlock(&lock);
}