wait / wake wip

This commit is contained in:
jacob 2025-07-05 21:01:30 -05:00
parent ba3a2454db
commit d47b951b82
6 changed files with 373 additions and 276 deletions

View File

@ -192,7 +192,7 @@ void asset_cache_wait(struct asset *asset)
if (asset->status != ASSET_STATUS_READY) { if (asset->status != ASSET_STATUS_READY) {
/* Wait on job */ /* Wait on job */
if (asset->counter) { if (asset->counter) {
sys_wait(asset->counter); sys_counter_wait(asset->counter);
} }
/* Wait for asset to be ready */ /* Wait for asset to be ready */
sync_flag_wait(&asset->asset_ready_sf); sync_flag_wait(&asset->asset_ready_sf);

View File

@ -930,7 +930,7 @@ INTERNAL SYS_JOB_DEF(pipeline_init_job, job)
struct sys_counter *counter = sys_counter_alloc(); struct sys_counter *counter = sys_counter_alloc();
{ {
sys_run(countof(params), shader_compile_job, &comp_sig, SYS_PRIORITY_HIGH, counter); sys_run(countof(params), shader_compile_job, &comp_sig, SYS_PRIORITY_HIGH, counter);
sys_wait(counter); sys_counter_wait(counter);
} }
sys_counter_release(counter); sys_counter_release(counter);
success = vs.success && ps.success; success = vs.success && ps.success;
@ -1127,7 +1127,7 @@ INTERNAL void pipeline_alloc(u64 num_pipelines, struct pipeline_desc *descs_in,
struct sys_counter *counter = sys_counter_alloc(); struct sys_counter *counter = sys_counter_alloc();
{ {
sys_run(num_pipelines, pipeline_init_job, &sig, SYS_PRIORITY_HIGH, counter); sys_run(num_pipelines, pipeline_init_job, &sig, SYS_PRIORITY_HIGH, counter);
sys_wait(counter); sys_counter_wait(counter);
} }
sys_counter_release(counter); sys_counter_release(counter);
} }

View File

@ -291,7 +291,7 @@ INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _)
struct sys_counter *counter = sys_counter_alloc(); struct sys_counter *counter = sys_counter_alloc();
{ {
sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_PRIORITY_BACKGROUND, counter); sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_PRIORITY_BACKGROUND, counter);
sys_wait(counter); sys_counter_wait(counter);
} }
sys_counter_release(counter); sys_counter_release(counter);
} }

View File

@ -1381,7 +1381,7 @@ INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg)
struct sys_lock evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex); struct sys_lock evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex);
while (!G.evictor_scheduler_shutdown) { while (!G.evictor_scheduler_shutdown) {
sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, job_counter); sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, job_counter);
sys_wait(job_counter); sys_counter_wait(job_counter);
sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS)); sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS));
} }
sys_mutex_unlock(&evictor_lock); sys_mutex_unlock(&evictor_lock);

View File

@ -477,17 +477,14 @@ b32 sys_run_command(struct string cmd);
/* ========================== * /* ========================== *
* Counter * Wait
* ========================== */ * ========================== */
struct sys_counter *sys_counter_alloc(void); /* Futex-like wait & wake */
void sys_counter_release(struct sys_counter *counter);
void sys_counter_add(struct sys_counter *counter, i64 amount); void sys_wait(void *addr, void *cmp, u32 size);
void sys_wake_all(void *addr);
void sys_wait(struct sys_counter *counter);
/* ========================== * /* ========================== *
* Fiber * Fiber
@ -497,8 +494,18 @@ void sys_wait(struct sys_counter *counter);
i32 sys_current_fiber_id(void); i32 sys_current_fiber_id(void);
/* Cooperative yield to give other jobs of the same priority level a chance to run */
void sys_yield(void); void sys_yield(void);
/* ========================== *
* Counter
* ========================== */
struct sys_counter *sys_counter_alloc(void);
void sys_counter_release(struct sys_counter *counter);
void sys_counter_add(struct sys_counter *counter, i64 amount);
void sys_counter_wait(struct sys_counter *counter);
/* ========================== * /* ========================== *
* Job * Job
* ========================== */ * ========================== */

View File

@ -52,7 +52,7 @@ struct win32_condition_variable {
CONDITION_VARIABLE condition_variable; CONDITION_VARIABLE condition_variable;
struct win32_condition_variable *next_free; struct win32_condition_variable *next_free;
#if RTC #if RTC
struct atomic_i64 num_yielding_waiters; struct atomic_i64 num_yielders;
#endif #endif
}; };
@ -112,7 +112,48 @@ struct win32_window {
struct win32_window *next_free; struct win32_window *next_free;
}; };
struct alignas(64) waiter {
#define NUM_WAIT_BINS 4096
struct alignas(64) wait_list {
/* =================================================== */
void *addr; /* 8 bytes */
/* =================================================== */
struct yielder *first_yielder; /* 8 bytes */
/* =================================================== */
struct yielder *last_yielder; /* 8 bytes */
/* =================================================== */
struct yielder *first_free_yielder; /* 8 bytes */
/* =================================================== */
i32 num_yielders; /* 4 bytes */
u8 _pad0[4]; /* 4 bytes (padding */
/* =================================================== */
struct wait_list *next_in_bin; /* 8 bytes */
/* =================================================== */
struct wait_list *prev_in_bin; /* 8 bytes */
/* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */
};
STATIC_ASSERT(sizeof(struct wait_list) == 64); /* Assume wait_list fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct wait_list) == 64); /* Avoid false sharing */
struct alignas(64) wait_bin {
/* =================================================== */
struct wait_list *first_wait_list; /* 8 bytes */
/* =================================================== */
struct wait_list *last_wait_list; /* 8 bytes */
/* =================================================== */
struct wait_list *first_free_wait_list; /* 8 bytes */
/* =================================================== */
struct atomic_i32 lock; /* 4 bytes */
u8 _pad0[4]; /* 4 bytes (padding) */
/* =================================================== */
u8 _pad1[32]; /* 32 bytes (padding) */
};
STATIC_ASSERT(sizeof(struct wait_bin) == 64); /* Assume wait_bin fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct wait_bin) == 64); /* Avoid false sharing */
struct alignas(64) yielder {
/* =================================================== */ /* =================================================== */
i32 fiber_id; /* 4 bytes */ i32 fiber_id; /* 4 bytes */
i32 job_queue_kind; /* 4 bytes */ i32 job_queue_kind; /* 4 bytes */
@ -123,17 +164,17 @@ struct alignas(64) waiter {
/* =================================================== */ /* =================================================== */
struct counter *job_counter; /* 8 bytes */ struct counter *job_counter; /* 8 bytes */
/* =================================================== */ /* =================================================== */
struct waiter *next; /* 8 bytes */ struct yielder *next; /* 8 bytes */
/* =================================================== */
struct yielder *prev; /* 8 bytes */
/* =================================================== */ /* =================================================== */
i32 job_id; /* 4 bytes */ i32 job_id; /* 4 bytes */
u8 _pad0[4]; /* 4 bytes (padding) */ u8 _pad0[4]; /* 4 bytes (padding) */
/* =================================================== */ /* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */ u8 _pad1[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad2[8]; /* 8 bytes (padding) */
}; };
STATIC_ASSERT(sizeof(struct waiter) == 64); /* Assume waiter fits in one cache line (increase if necessary) */ STATIC_ASSERT(sizeof(struct yielder) == 64); /* Assume yielder fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct waiter) == 64); /* Avoid false sharing */ STATIC_ASSERT(alignof(struct yielder) == 64); /* Avoid false sharing */
@ -141,22 +182,9 @@ STATIC_ASSERT(alignof(struct waiter) == 64); /* Avoid false sharing */
struct alignas(64) counter { struct alignas(64) counter {
/* =================================================== */ /* =================================================== */
struct atomic_i64 v; /* 8 bytes */ struct atomic_i64 v; /* 8 bytes */
/* =================================================== */
struct counter *next_free; /* 8 bytes */ struct counter *next_free; /* 8 bytes */
/* =================================================== */ /* =================================================== */
struct atomic_i32 waiters_lock; /* 4 bytes */ u8 _pad[48]; /* 56 bytes (padding) */
u8 _pad0[4]; /* 4 bytes (padding) */
/* =================================================== */
i32 num_blocking_waiters; /* 4 bytes */
i32 num_yielding_waiters; /* 4 bytes */
/* =================================================== */
struct waiter *first_waiter; /* 8 bytes */
/* =================================================== */
struct waiter *last_waiter; /* 8 bytes */
/* =================================================== */
struct atomic_i64 wake_gen; /* 8 bytes */
/* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */
}; };
STATIC_ASSERT(sizeof(struct counter) == 64); /* Assume counter fits in one cache line (increase if necessary) */ STATIC_ASSERT(sizeof(struct counter) == 64); /* Assume counter fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */ STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */
@ -180,6 +208,17 @@ enum yield_kind {
NUM_YIELD_KINDS NUM_YIELD_KINDS
}; };
struct yield_param {
enum yield_kind kind;
union {
struct {
void *addr;
void *cmp;
u32 size;
} wait;
};
};
struct alignas(64) fiber { struct alignas(64) fiber {
/* =================================================== */ /* =================================================== */
char *name_cstr; /* 8 bytes */ char *name_cstr; /* 8 bytes */
@ -196,10 +235,9 @@ struct alignas(64) fiber {
i32 job_id; /* 4 bytes */ i32 job_id; /* 4 bytes */
i32 job_priority; /* 4 bytes */ i32 job_priority; /* 4 bytes */
/* =================================================== */ /* =================================================== */
void *yield_arg; /* 8 bytes */ struct yield_param *yield_param; /* 8 bytes */
/* =================================================== */ /* =================================================== */
enum yield_kind yield_kind; /* 4 bytes */ u8 _pad0[8]; /* 8 bytes (padding) */
u8 _pad0[4]; /* 4 bytes (padding) */
}; };
STATIC_ASSERT(sizeof(struct fiber) == 64); /* Assume fiber fits in one cache line (increase if necessary) */ STATIC_ASSERT(sizeof(struct fiber) == 64); /* Assume fiber fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */ STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */
@ -315,19 +353,16 @@ GLOBAL struct {
/* Yielders */
struct atomic_i32 yielders_arena_lock; /* TODO: Prevent false sharing */
struct arena *yielders_arena;
/* Wait lists */
struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */
struct arena *wait_lists_arena;
/* Wait table */
struct wait_bin wait_bins[NUM_WAIT_BINS];
/* Waiters */
struct arena *waiters_arena;
struct atomic_i32 waiters_lock; /* TODO: Prevent false sharing */
struct waiter *first_free_waiter;
/* Counters */
struct arena *counters_arena;
struct atomic_i32 counters_lock; /* TODO: Prevent false sharing */
struct counter *first_free_counter;
/* Fibers */ /* Fibers */
i32 num_fibers; i32 num_fibers;
@ -337,6 +372,11 @@ GLOBAL struct {
struct fiber fibers[SYS_MAX_FIBERS]; struct fiber fibers[SYS_MAX_FIBERS];
struct fiber_ctx fiber_contexts[SYS_MAX_FIBERS]; struct fiber_ctx fiber_contexts[SYS_MAX_FIBERS];
/* Counters */
struct arena *counters_arena;
struct atomic_i32 counters_lock; /* TODO: Prevent false sharing */
struct counter *first_free_counter;
/* Jobs */ /* Jobs */
struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; struct job_queue job_queues[NUM_JOB_QUEUE_KINDS];
@ -350,91 +390,102 @@ GLOBAL struct {
} G = ZI, DEBUG_ALIAS(G, G_sys_win32); } G = ZI, DEBUG_ALIAS(G, G_sys_win32);
INTERNAL struct fiber *fiber_from_id(i32 id);
INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param);
/* ========================== * /* ========================== *
* Counters * Wait / wake
* ========================== */ * ========================== */
INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_kind kind, void *arg); void sys_wait(void *addr, void *cmp, u32 size)
INTERNAL struct fiber *fiber_from_id(i32 id);
INTERNAL struct counter *counter_alloc(void)
{ {
struct counter *counter = NULL; #if 1
{ WaitOnAddress(addr, cmp, size, INFINITE);
while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause(); #else
{ struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
if (G.first_free_counter) { i32 parent_fiber_id = fiber->parent_id;
counter = G.first_free_counter; /* Yield if job fiber, otherwise fall back to windows blocking function */
G.first_free_counter = counter->next_free; if (parent_fiber_id > 0) {
#if 1
/* Yield if job fiber */
struct yield_param param = {
.kind = YIELD_KIND_WAIT,
.wait = {
.addr = addr,
.cmp = cmp,
.size = size
}
};
fiber_yield(fiber, fiber_from_id(parent_fiber_id), &param);
#else
while (MEMEQ(addr, cmp, size)) {
ix_pause();
}
#endif
} else { } else {
counter = arena_push_no_zero(G.counters_arena, struct counter); WaitOnAddress(addr, cmp, size, INFINITE);
} }
} #endif
atomic_i32_fetch_set(&G.counters_lock, 0);
}
MEMZERO_STRUCT(counter);
return counter;
} }
INTERNAL void counter_release(struct counter *counter) void sys_wake_all(void *addr)
{ {
ASSERT(counter->first_waiter == NULL); /* Counter should not have any waiters */ #if 0
while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause(); u64 wait_bin_index = (u64)addr % NUM_WAIT_BINS;
{ struct wait_bin *bin = &G.wait_bins[wait_bin_index];
counter->next_free = G.first_free_counter;
G.first_free_counter = counter;
}
atomic_i32_fetch_set(&G.counters_lock, 0);
}
INTERNAL void counter_add(struct counter *counter, i64 amount) while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
{ {
i64 old_v = atomic_i64_fetch_add(&counter->v, amount); struct wait_list *wait_list = NULL;
i64 new_v = old_v + amount; for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) {
if (old_v > 0 && new_v <= 0) { if (tmp->addr == addr) {
/* Wake waiters */ wait_list = tmp;
struct waiter *first_waiter = NULL;
struct waiter *last_waiter = NULL;
{
while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause();
{
/* Wake blocking waiters */
atomic_i64_fetch_add(&counter->wake_gen, 1);
if (counter->num_blocking_waiters > 0) {
WakeByAddressAll(&counter->wake_gen);
} }
}
/* Wake yielding waiters (fibers) */ if (wait_list && wait_list->num_yielders > 0) {
first_waiter = counter->first_waiter;
last_waiter = counter->last_waiter;
if (first_waiter) {
struct arena_temp scratch = scratch_begin_no_conflict(); struct arena_temp scratch = scratch_begin_no_conflict();
{ {
/* Separate waiters by queue kind */ /* Separate yielders by queue kind */
i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI; i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI;
struct waiter **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI; struct yielder **queue_yielder_arrays[NUM_JOB_QUEUE_KINDS] = ZI;
for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) { for (i32 i = 0; i < (i32)countof(queue_yielder_arrays); ++i) {
/* NOTE: Each array is conservatively sized as the number of all waiters in the counter */ /* NOTE: Each array is conservatively sized as the number of all yielders in the list */
queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct waiter *, counter->num_yielding_waiters); queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, wait_list->num_yielders);
} }
for (struct waiter *waiter = first_waiter; waiter; waiter = waiter->next) { for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) {
enum job_queue_kind queue_kind = waiter->job_queue_kind; enum job_queue_kind queue_kind = yielder->job_queue_kind;
i32 index = queue_counts[queue_kind]++; i32 index = queue_counts[queue_kind]++;
struct waiter **array = queue_waiter_arrays[index]; struct yielder **array = queue_yielder_arrays[index];
array[index] = waiter; array[index] = yielder;
} }
/* Push jobs */ /* Push jobs */
for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) { for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) {
i32 num_jobs = queue_counts[queue_kind]; i32 num_jobs = queue_counts[queue_kind];
if (num_jobs > 0) { if (num_jobs > 0) {
struct job_queue *queue = &G.job_queues[queue_kind]; struct job_queue *queue = &G.job_queues[queue_kind];
struct waiter **queue_waiters = queue_waiter_arrays[queue_kind]; struct yielder **queue_yielders = queue_yielder_arrays[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
{ {
/* TODO: More efficient batch job list allocation */ /* TODO: More efficient batch job list allocation */
for (i32 i = 0; i < num_jobs; ++i) { for (i32 i = 0; i < num_jobs; ++i) {
struct waiter *waiter = queue_waiters[i]; struct yielder *yielder = queue_yielders[i];
struct job_info *info = NULL; struct job_info *info = NULL;
if (queue->first_free) { if (queue->first_free) {
info = queue->first_free; info = queue->first_free;
@ -444,11 +495,11 @@ INTERNAL void counter_add(struct counter *counter, i64 amount)
} }
MEMZERO_STRUCT(info); MEMZERO_STRUCT(info);
info->count = 1; info->count = 1;
info->num_dispatched = waiter->job_id; info->num_dispatched = yielder->job_id;
info->func = waiter->job_func; info->func = yielder->job_func;
info->sig = waiter->job_sig; info->sig = yielder->job_sig;
info->counter = waiter->job_counter; info->counter = yielder->job_counter;
info->fiber_id = waiter->fiber_id; info->fiber_id = yielder->fiber_id;
if (queue->last) { if (queue->last) {
queue->last->next = info; queue->last->next = info;
} else { } else {
@ -460,84 +511,16 @@ INTERNAL void counter_add(struct counter *counter, i64 amount)
atomic_i32_fetch_set(&queue->lock, 0); atomic_i32_fetch_set(&queue->lock, 0);
} }
} }
/* Reset waiters list */
counter->num_yielding_waiters = 0;
counter->first_waiter = NULL;
counter->last_waiter = NULL;
} }
scratch_end(scratch); scratch_end(scratch);
} }
counter->num_yielding_waiters = 0;
}
atomic_i32_fetch_set(&counter->waiters_lock, 0);
}
/* Add waiters to free list */
if (first_waiter) {
while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause();
{
last_waiter->next = G.first_free_waiter;
G.first_free_waiter = first_waiter;
}
atomic_i32_fetch_set(&G.waiters_lock, 0);
}
}
}
INTERNAL void counter_wait(struct counter *counter)
{
__prof;
/* TODO: Spin with configurable count */
if (atomic_i64_fetch(&counter->v) > 0) {
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i32 parent_fiber_id = fiber->parent_id;
if (parent_fiber_id > 0) {
#if 0
/* Yield if job fiber */
yield(fiber, fiber_from_id(parent_fiber_id), YIELD_KIND_WAIT, counter);
#else
while (atomic_i64_fetch(&counter->v) != 0) {
ix_pause();
} }
atomic_i32_fetch_set(&bin->lock, 1);
#endif #endif
} else {
/* Top-level fibers should block since they can't yield */
i64 wake_gen = 0;
{
while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause();
++counter->num_blocking_waiters;
wake_gen = atomic_i64_fetch(&counter->wake_gen);
atomic_i32_fetch_set(&counter->waiters_lock, 0);
}
while (atomic_i64_fetch(&counter->wake_gen) <= wake_gen) {
WaitOnAddress(&counter->wake_gen, &wake_gen, sizeof(wake_gen), INFINITE);
}
}
}
#if 0 /* Wake blocking waiters */
if (atomic_i64_fetch(&counter->v) > 0) { WakeByAddressAll(addr);
/* Top-level fibers should block since they can't yield */
i64 wake_gen = 0;
{
while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause();
++counter->num_blocking_waiters;
wake_gen = atomic_i64_fetch(&counter->wake_gen);
atomic_i32_fetch_set(&counter->waiters_lock, 0);
} }
while (atomic_i64_fetch(&counter->wake_gen) <= wake_gen) {
WaitOnAddress(&counter->wake_gen, &wake_gen, sizeof(wake_gen), INFINITE);
}
}
#endif
}
struct sys_counter *sys_counter_alloc(void) { return (struct sys_counter *)counter_alloc(); }
void sys_counter_release(struct sys_counter *counter) { counter_release((struct counter *)counter); }
void sys_counter_add(struct sys_counter *counter, i64 amount) { counter_add((struct counter *)counter, amount); }
void sys_wait(struct sys_counter *counter) { counter_wait((struct counter *)counter); }
/* ========================== * /* ========================== *
* Fibers * Fibers
@ -616,7 +599,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind)
fiber->job_sig = 0; fiber->job_sig = 0;
fiber->job_id = 0; fiber->job_id = 0;
fiber->job_priority = 0; fiber->job_priority = 0;
fiber->yield_kind = 0; fiber->yield_param = 0;
fiber->parent_id = 0; fiber->parent_id = 0;
return fiber; return fiber;
} }
@ -643,6 +626,63 @@ INTERNAL struct fiber_ctx *fiber_ctx_from_id(i32 id)
return &G.fiber_contexts[id]; return &G.fiber_contexts[id];
} }
/* ========================== *
* Counters
* ========================== */
INTERNAL struct counter *counter_alloc(void)
{
struct counter *counter = NULL;
{
while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause();
{
if (G.first_free_counter) {
counter = G.first_free_counter;
G.first_free_counter = counter->next_free;
} else {
counter = arena_push_no_zero(G.counters_arena, struct counter);
}
}
atomic_i32_fetch_set(&G.counters_lock, 0);
}
MEMZERO_STRUCT(counter);
return counter;
}
INTERNAL void counter_release(struct counter *counter)
{
while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause();
{
counter->next_free = G.first_free_counter;
G.first_free_counter = counter;
}
atomic_i32_fetch_set(&G.counters_lock, 0);
}
INTERNAL void counter_add(struct counter *counter, i64 amount)
{
i64 old_v = atomic_i64_fetch_add(&counter->v, amount);
i64 new_v = old_v + amount;
if (old_v > 0 && new_v <= 0) {
sys_wake_all(&counter->v);
}
}
INTERNAL void counter_wait(struct counter *counter)
{
/* TODO: Spin with configurable count */
i64 v = atomic_i64_fetch(&counter->v);
while (v > 0) {
sys_wait(&counter->v, &v, sizeof(v));
v = atomic_i64_fetch(&counter->v);
}
}
struct sys_counter *sys_counter_alloc(void) { return (struct sys_counter *)counter_alloc(); }
void sys_counter_release(struct sys_counter *counter) { counter_release((struct counter *)counter); }
void sys_counter_add(struct sys_counter *counter, i64 amount) { counter_add((struct counter *)counter, amount); }
void sys_counter_wait(struct sys_counter *counter) { counter_wait((struct counter *)counter); }
/* ========================== * /* ========================== *
* Test job * Test job
* ========================== */ * ========================== */
@ -652,7 +692,7 @@ i32 sys_current_fiber_id(void)
return (i32)(i64)GetFiberData(); return (i32)(i64)GetFiberData();
} }
INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_kind kind, void *arg) INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param)
{ {
ASSERT(fiber->id == sys_current_fiber_id()); ASSERT(fiber->id == sys_current_fiber_id());
ASSERT(parent_fiber->id == fiber->parent_id); ASSERT(parent_fiber->id == fiber->parent_id);
@ -661,8 +701,7 @@ INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_
} }
{ {
__prof_fiber_leave; __prof_fiber_leave;
fiber->yield_kind = kind; fiber->yield_param = param;
fiber->yield_arg = arg;
SwitchToFiber(parent_fiber->addr); SwitchToFiber(parent_fiber->addr);
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id); __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id);
} }
@ -670,11 +709,13 @@ INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_
void sys_yield(void) void sys_yield(void)
{ {
/* TODO: Don't yield if job queue is empty */
struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i32 parent_id = fiber->parent_id; i32 parent_id = fiber->parent_id;
if (parent_id > 0) { /* Top level fibers should not yield */ if (parent_id > 0) { /* Top level fibers should not yield */
struct fiber *parent_fiber = fiber_from_id(parent_id); struct fiber *parent_fiber = fiber_from_id(parent_id);
yield(fiber, parent_fiber, YIELD_KIND_COOPERATIVE, NULL); struct yield_param param = { .kind = YIELD_KIND_COOPERATIVE };
fiber_yield(fiber, parent_fiber, &param);
} }
} }
@ -723,6 +764,7 @@ INTERNAL void job_fiber_entry(void *id_ptr)
{ {
i32 id = (i32)(i64)id_ptr; i32 id = (i32)(i64)id_ptr;
struct fiber *fiber = fiber_from_id(id); struct fiber *fiber = fiber_from_id(id);
struct yield_param yield = ZI;
while (true) { while (true) {
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id); __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id);
i32 parent_id = fiber->parent_id; i32 parent_id = fiber->parent_id;
@ -731,13 +773,14 @@ INTERNAL void job_fiber_entry(void *id_ptr)
/* Run job */ /* Run job */
{ {
__profscope(Run job); __profscope(Run job);
fiber->yield_kind = YIELD_KIND_NONE; yield.kind = YIELD_KIND_NONE;
fiber->yield_arg = NULL; fiber->yield_param = &yield;
struct sys_job_data data = ZI; struct sys_job_data data = ZI;
data.id = fiber->job_id; data.id = fiber->job_id;
data.sig = fiber->job_sig; data.sig = fiber->job_sig;
fiber->job_func(data); fiber->job_func(data);
fiber->yield_kind = YIELD_KIND_DONE; yield.kind = YIELD_KIND_DONE;
fiber->yield_param = &yield;
} }
__prof_fiber_leave; __prof_fiber_leave;
SwitchToFiber(parent_fiber_addr); SwitchToFiber(parent_fiber_addr);
@ -766,6 +809,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
/* Pull job from queue */ /* Pull job from queue */
enum sys_priority job_priority = 0; enum sys_priority job_priority = 0;
enum job_queue_kind job_queue_kind = 0; enum job_queue_kind job_queue_kind = 0;
i32 job_fiber_id = 0;
i32 job_id = 0; i32 job_id = 0;
sys_job_func *job_func = 0; sys_job_func *job_func = 0;
void *job_sig = 0; void *job_sig = 0;
@ -800,10 +844,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
} }
} else { } else {
/* This job is being resumed from a yield */ /* This job is being resumed from a yield */
if (job_fiber) { job_fiber_id = info->fiber_id;
fiber_release(job_fiber, job_fiber->id);
}
job_fiber = fiber_from_id(info->fiber_id);
job_id = info->num_dispatched; job_id = info->num_dispatched;
job_func = info->func; job_func = info->func;
job_sig = info->sig; job_sig = info->sig;
@ -826,6 +867,14 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
} }
} }
/* Use resumed fiber if present */
if (job_fiber_id > 0) {
if (job_fiber) {
fiber_release(job_fiber, job_fiber->id);
}
job_fiber = fiber_from_id(job_fiber_id);
}
/* Run fiber */ /* Run fiber */
if (job_func) { if (job_func) {
__profscope(Run fiber); __profscope(Run fiber);
@ -840,59 +889,97 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
b32 done = false; b32 done = false;
while (!done) { while (!done) {
SwitchToFiber(job_fiber->addr); SwitchToFiber(job_fiber->addr);
enum yield_kind yield_kind = job_fiber->yield_kind; struct yield_param *yield_param = job_fiber->yield_param;
enum yield_kind yield_kind = yield_param ? yield_param->kind : YIELD_KIND_NONE;
switch (yield_kind) { switch (yield_kind) {
default: default:
{ {
/* Invalid yield kind */ /* Invalid yield kind */
sys_panic(LIT("Fiber yielded with unknown yield kind")); sys_panic(LIT("Invalid fiber yield"));
} break; } break;
case YIELD_KIND_WAIT: case YIELD_KIND_WAIT:
{ {
struct counter *wait_counter = job_fiber->yield_arg; #if 1
{ void *wait_addr = yield_param->wait.addr;
while (atomic_i32_fetch_test_set(&wait_counter->waiters_lock, 0, 1) != 0) ix_pause(); void *wait_cmp = yield_param->wait.cmp;
{ u32 wait_size = yield_param->wait.size;
if (atomic_i64_fetch(&wait_counter->v) > 0) { /* Re-check counter now that it's locked */
/* Allocate waiter */
struct waiter *waiter = NULL;
{
while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause();
{
if (G.first_free_waiter) {
waiter = G.first_free_waiter;
G.first_free_waiter = waiter->next;
} else {
waiter = arena_push_no_zero(G.waiters_arena, struct waiter);
}
}
atomic_i32_fetch_set(&G.waiters_lock, 0);
}
MEMZERO_STRUCT(waiter);
waiter->fiber_id = job_fiber->id;
waiter->job_queue_kind = job_queue_kind;
waiter->job_func = job_func;
waiter->job_sig = job_sig;
waiter->job_counter = job_counter;
waiter->job_id = job_id;
/* Insert waiter */ u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_BINS;
if (wait_counter->last_waiter) { struct wait_bin *bin = &G.wait_bins[wait_bin_index];
wait_counter->last_waiter->next = waiter;
} else { while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
wait_counter->first_waiter = waiter; {
if (!MEMEQ(wait_addr, wait_cmp, wait_size)) {
/* Search addr wait list in bin */
struct wait_list *wait_list = NULL;
for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) {
if (tmp->addr == wait_addr) {
wait_list = tmp;
} }
wait_counter->last_waiter = waiter; }
++wait_counter->num_yielding_waiters;
/* Allocate new wait list */
if (!wait_list) {
if (bin->first_free_wait_list) {
wait_list = bin->first_free_wait_list;
bin->first_free_wait_list = wait_list->next_in_bin;
} else {
while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause();
{
wait_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list);
}
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
}
MEMZERO_STRUCT(wait_list);
wait_list->addr = wait_addr;
if (bin->last_wait_list) {
bin->last_wait_list->next_in_bin = wait_list;
wait_list->prev_in_bin = bin->last_wait_list;
} else {
bin->first_wait_list = wait_list;
}
bin->last_wait_list = wait_list;
}
/* Allocate new yielder */
struct yielder *yielder = NULL;
if (wait_list->first_free_yielder) {
yielder = wait_list->first_free_yielder;
wait_list->first_free_yielder = yielder->next;
} else {
while (atomic_i32_fetch_test_set(&G.yielders_arena_lock, 0, 1) != 0) ix_pause();
{
yielder = arena_push_no_zero(G.yielders_arena, struct yielder);
}
atomic_i32_fetch_set(&G.yielders_arena_lock, 0);
}
MEMZERO_STRUCT(yielder);
yielder->fiber_id = job_fiber->id;
yielder->job_queue_kind = job_queue_kind;
yielder->job_func = job_func;
yielder->job_sig = job_sig;
yielder->job_counter = job_counter;
yielder->job_id = job_id;
if (wait_list->last_yielder) {
wait_list->last_yielder->next = yielder;
yielder->prev = wait_list->last_yielder;
} else {
wait_list->first_yielder = yielder;
}
wait_list->last_yielder = yielder;
++wait_list->num_yielders;
/* Pop worker's job fiber */ /* Pop worker's job fiber */
job_fiber = NULL; job_fiber = NULL;
done = true; done = true;
} }
} }
atomic_i32_fetch_set(&wait_counter->waiters_lock, 0); atomic_i32_fetch_set(&bin->lock, 0);
} #else
(UNUSED)job_queue_kind;
ASSERT(false);
#endif
} break; } break;
case YIELD_KIND_DONE: case YIELD_KIND_DONE:
@ -2577,7 +2664,7 @@ void sys_condition_variable_release(struct sys_condition_variable *sys_cv)
__prof; __prof;
struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv;
/* Condition variable must not have any sleepers (signal before releasing) */ /* Condition variable must not have any sleepers (signal before releasing) */
ASSERT(atomic_i64_fetch(&cv->num_yielding_waiters) == 0); ASSERT(atomic_i64_fetch(&cv->num_yielders) == 0);
win32_condition_variable_release(cv); win32_condition_variable_release(cv);
} }
@ -2587,7 +2674,7 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s
struct win32_mutex *m = (struct win32_mutex *)lock->mutex; struct win32_mutex *m = (struct win32_mutex *)lock->mutex;
b32 exclusive = lock->exclusive; b32 exclusive = lock->exclusive;
#if RTC #if RTC
atomic_i64_fetch_add(&cv->num_yielding_waiters, 1); atomic_i64_fetch_add(&cv->num_yielders, 1);
if (exclusive) { if (exclusive) {
m->owner_tid = 0; m->owner_tid = 0;
} }
@ -2614,7 +2701,7 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s
if (exclusive) { if (exclusive) {
m->owner_tid = (u64)GetCurrentThreadId(); m->owner_tid = (u64)GetCurrentThreadId();
} }
atomic_i64_fetch_add(&cv->num_yielding_waiters, -1); atomic_i64_fetch_add(&cv->num_yielders, -1);
#endif #endif
} }
@ -2624,7 +2711,7 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str
struct win32_mutex *m = (struct win32_mutex *)lock->mutex; struct win32_mutex *m = (struct win32_mutex *)lock->mutex;
b32 exclusive = lock->exclusive; b32 exclusive = lock->exclusive;
#if RTC #if RTC
atomic_i64_fetch_add(&cv->num_yielding_waiters, 1); atomic_i64_fetch_add(&cv->num_yielders, 1);
if (exclusive) { if (exclusive) {
m->owner_tid = 0; m->owner_tid = 0;
} }
@ -2652,7 +2739,7 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str
if (exclusive) { if (exclusive) {
m->owner_tid = (u64)GetCurrentThreadId(); m->owner_tid = (u64)GetCurrentThreadId();
} }
atomic_i64_fetch_add(&cv->num_yielding_waiters, -1); atomic_i64_fetch_add(&cv->num_yielders, -1);
#endif #endif
} }
@ -3172,16 +3259,19 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
__profthread("Main thread", PROF_THREAD_GROUP_MAIN); __profthread("Main thread", PROF_THREAD_GROUP_MAIN);
/* Init waiters */ /* Init yielders */
G.waiters_arena = arena_alloc(GIGABYTE(64)); G.yielders_arena = arena_alloc(GIGABYTE(64));
/* Init counters */ /* Init wait lists */
G.counters_arena = arena_alloc(GIGABYTE(64)); G.wait_lists_arena = arena_alloc(GIGABYTE(64));
/* Init fibers */ /* Init fibers */
G.num_fibers = 1; /* Fiber at index 0 always nil */ G.num_fibers = 1; /* Fiber at index 0 always nil */
G.fiber_names_arena = arena_alloc(GIGABYTE(64)); G.fiber_names_arena = arena_alloc(GIGABYTE(64));
/* Init counters */
G.counters_arena = arena_alloc(GIGABYTE(64));
/* Convert main thread to fiber */ /* Convert main thread to fiber */
fiber_alloc(FIBER_KIND_CONVERTED_THREAD); fiber_alloc(FIBER_KIND_CONVERTED_THREAD);