waiter testing

This commit is contained in:
jacob 2025-07-05 18:04:13 -05:00
parent 5e99224b28
commit 402f8a12c9
2 changed files with 221 additions and 44 deletions

View File

@ -508,7 +508,7 @@ enum sys_priority {
SYS_PRIORITY_NORMAL = 1,
SYS_PRIORITY_BACKGROUND = 2,
NUM_SYS_JOB_PRIORITIES
NUM_SYS_PRIORITIES
};
struct sys_job_data {

View File

@ -111,10 +111,28 @@ struct win32_window {
struct win32_window *next_free;
};
struct alignas(64) waiter {
/* =================================================== */
i32 fiber_id; /* 4 bytes */
i32 job_queue_kind; /* 4 bytes */
/* =================================================== */
sys_job_func *job_func; /* 8 bytes */
/* =================================================== */
void *job_sig; /* 8 bytes */
/* =================================================== */
struct counter *job_counter; /* 8 bytes */
/* =================================================== */
struct waiter *next; /* 8 bytes */
/* =================================================== */
i32 job_id; /* 4 bytes */
u8 _pad0[4]; /* 4 bytes (padding) */
/* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad2[8]; /* 8 bytes (padding) */
};
STATIC_ASSERT(sizeof(struct waiter) == 64); /* Assume waiter fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct waiter) == 64); /* Avoid false sharing */
@ -125,17 +143,18 @@ struct alignas(64) counter {
/* =================================================== */
struct counter *next_free; /* 8 bytes */
/* =================================================== */
u8 _pad0[8]; /* 8 bytes (padding) */
struct atomic_i32 waiters_lock; /* 4 bytes */
i32 num_waiters; /* 4 bytes */
/* =================================================== */
struct waiter *first_waiter; /* 8 bytes */
/* =================================================== */
struct waiter *last_waiter; /* 8 bytes */
/* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad2[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad3[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad4[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad5[8]; /* 8 bytes (padding) */
};
STATIC_ASSERT(sizeof(struct counter) == 64); /* Assume counter fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */
@ -154,7 +173,7 @@ enum yield_kind {
YIELD_KIND_NONE,
YIELD_KIND_DONE,
YIELD_KIND_COOPERATIVE,
YIELD_KIND_SLEEP,
YIELD_KIND_WAIT,
NUM_YIELD_KINDS
};
@ -175,10 +194,10 @@ struct alignas(64) fiber {
i32 job_id; /* 4 bytes */
i32 job_priority; /* 4 bytes */
/* =================================================== */
void *yield_arg; /* 8 bytes */
/* =================================================== */
enum yield_kind yield_kind; /* 4 bytes */
u8 _pad0[4]; /* 4 bytes (padding) */
/* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */
};
STATIC_ASSERT(sizeof(struct fiber) == 64); /* Assume fiber fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */
@ -217,6 +236,8 @@ struct job_info {
void *sig;
struct counter *counter;
i32 fiber_id; /* If the job is being resumed from a yield */
struct job_info *next;
};
@ -296,7 +317,10 @@ GLOBAL struct {
/* Waiters */
struct arena *waiters_arena;
struct atomic_i32 waiters_lock; /* TODO: Prevent false sharing */
struct waiter *first_free_waiter;
/* Counters */
struct arena *counters_arena;
@ -324,12 +348,12 @@ GLOBAL struct {
} G = ZI, DEBUG_ALIAS(G, G_sys_win32);
/* ========================== *
* Counters
* ========================== */
INTERNAL void yield(enum yield_kind kind, void *arg);
INTERNAL struct counter *counter_alloc(void)
{
struct counter *counter = NULL;
@ -351,6 +375,7 @@ INTERNAL struct counter *counter_alloc(void)
INTERNAL void counter_release(struct counter *counter)
{
ASSERT(counter->first_waiter == NULL); /* Counter should not have any waiters */
while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause();
{
counter->next_free = G.first_free_counter;
@ -364,19 +389,105 @@ INTERNAL void counter_add(struct counter *counter, i64 amount)
i64 old_v = atomic_i64_fetch_add(&counter->v, amount);
i64 new_v = old_v + amount;
if (old_v > 0 && new_v <= 0) {
/* TODO: Wake waiters */
(UNUSED)old_v;
(UNUSED)new_v;
/* Wake waiters */
struct waiter *first_waiter = NULL;
struct waiter *last_waiter = NULL;
{
while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause();
{
first_waiter = counter->first_waiter;
last_waiter = counter->last_waiter;
if (first_waiter) {
struct arena_temp scratch = scratch_begin_no_conflict();
{
/* Separate waiters by queue kind */
i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI;
struct waiter **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI;
for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) {
queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct waiter *, counter->num_waiters);
}
for (struct waiter *waiter = first_waiter; waiter; waiter = waiter->next) {
enum job_queue_kind queue_kind = waiter->job_queue_kind;
i32 index = queue_counts[queue_kind]++;
struct waiter **array = queue_waiter_arrays[index];
array[index] = waiter;
}
/* Push jobs */
for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) {
i32 num_jobs = queue_counts[queue_kind];
if (num_jobs > 0) {
struct job_queue *queue = &G.job_queues[queue_kind];
struct waiter **queue_waiters = queue_waiter_arrays[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
{
/* TODO: More efficient batch job list allocation */
for (i32 i = 0; i < num_jobs; ++i) {
struct waiter *waiter = queue_waiters[i];
struct job_info *info = NULL;
if (queue->first_free) {
info = queue->first_free;
queue->first_free = info->next;
} else {
info = arena_push_no_zero(queue->arena, struct job_info);
}
MEMZERO_STRUCT(info);
info->count = 1;
info->num_dispatched = waiter->job_id;
info->func = waiter->job_func;
info->sig = waiter->job_sig;
info->counter = waiter->job_counter;
info->fiber_id = waiter->fiber_id;
if (queue->last) {
queue->last->next = info;
} else {
queue->first = info;
}
queue->last = info;
}
}
atomic_i32_fetch_set(&queue->lock, 0);
}
}
/* Reset waiters list */
counter->num_waiters = 0;
counter->first_waiter = NULL;
counter->last_waiter = NULL;
}
scratch_end(scratch);
}
counter->num_waiters = 0;
}
atomic_i32_fetch_set(&counter->waiters_lock, 0);
}
/* Add waiters to free list */
if (first_waiter) {
while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause();
{
last_waiter->next = G.first_free_waiter;
G.first_free_waiter = first_waiter;
}
atomic_i32_fetch_set(&G.waiters_lock, 0);
}
}
}
INTERNAL void counter_wait(struct counter *counter)
{
__prof;
/* TODO: Yield with configurable spin count */
while (atomic_i64_fetch(&counter->v) > 0) {
#if 0
/* TODO: Spin with configurable count */
if (atomic_i64_fetch(&counter->v) > 0) {
/* Yield */
yield(YIELD_KIND_WAIT, counter);
}
#else
while (atomic_i64_fetch(&counter->v) != 0) {
ix_pause();
}
#endif
}
struct sys_counter *sys_counter_alloc(void) { return (struct sys_counter *)counter_alloc(); }
@ -497,7 +608,7 @@ i32 sys_current_fiber_id(void)
return (i32)(i64)GetFiberData();
}
INTERNAL void yield(enum yield_kind kind)
INTERNAL void yield(enum yield_kind kind, void *arg)
{
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i32 parent_fiber_id = fiber->parent_id;
@ -508,6 +619,7 @@ INTERNAL void yield(enum yield_kind kind)
{
__prof_fiber_leave;
fiber->yield_kind = kind;
fiber->yield_arg = arg;
SwitchToFiber(parent_fiber->addr);
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id);
}
@ -515,7 +627,7 @@ INTERNAL void yield(enum yield_kind kind)
void sys_yield(void)
{
yield(YIELD_KIND_COOPERATIVE);
yield(YIELD_KIND_COOPERATIVE, NULL);
}
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct sys_counter *counter)
@ -527,7 +639,7 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit
}
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */
STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
enum job_queue_kind queue_kind = (enum job_queue_kind)priority;
struct job_queue *queue = &G.job_queues[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
@ -572,6 +684,7 @@ INTERNAL void job_fiber_entry(void *id_ptr)
{
__profscope(Run job);
fiber->yield_kind = YIELD_KIND_NONE;
fiber->yield_arg = NULL;
struct sys_job_data data = ZI;
data.id = fiber->job_id;
data.sig = fiber->job_sig;
@ -604,6 +717,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
while (!atomic_i32_fetch(&G.workers_shutdown)) {
/* Pull job from queue */
enum sys_priority job_priority = 0;
enum job_queue_kind job_queue_kind = 0;
i32 job_id = 0;
sys_job_func *job_func = 0;
void *job_sig = 0;
@ -614,19 +728,41 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
struct job_queue *queue = queues[queue_index];
if (queue) {
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
struct job_info *info = queue->first;
while (info && !job_func) {
struct job_info *next = info->next;
job_id = info->num_dispatched++;
if (job_id < info->count) {
/* Pick job */
STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
job_priority = (enum sys_priority)queue->kind;
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
if (job_id == (info->count - 1)) {
/* We're picking up the last dispatch, so dequeue the job */
{
struct job_info *info = queue->first;
job_priority = (enum sys_priority)queue->kind;
job_queue_kind = queue->kind;
while (info && !job_func) {
struct job_info *next = info->next;
b32 dequeue = false;
if (info->fiber_id <= 0) {
job_id = info->num_dispatched++;
if (job_id < info->count) {
/* Pick job */
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
if (info->fiber_id > 0) {
}
if (job_id == (info->count - 1)) {
/* We're picking up the last dispatch, so dequeue the job */
dequeue = true;
}
}
} else {
/* This job is being resumed from a yield */
if (job_fiber) {
fiber_release(job_fiber, job_fiber->id);
}
job_fiber = fiber_from_id(info->fiber_id);
job_id = info->num_dispatched;
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
dequeue = true;
}
if (dequeue) {
if (!next) {
queue->last = NULL;
}
@ -634,8 +770,8 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
info->next = queue->first_free;
queue->first_free = info;
}
info = next;
}
info = next;
}
atomic_i32_fetch_set(&queue->lock, 0);
}
@ -664,10 +800,51 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
sys_panic(LIT("Fiber yielded with unknown yield kind"));
} break;
case YIELD_KIND_SLEEP:
case YIELD_KIND_WAIT:
{
__profscope(Job sleep);
Sleep(100);
struct counter *wait_counter = job_fiber->yield_arg;
{
while (atomic_i32_fetch_test_set(&wait_counter->waiters_lock, 0, 1) != 0) ix_pause();
{
if (atomic_i64_fetch(&wait_counter->v) > 0) { /* Re-check counter now that it's locked */
/* Allocate waiter */
struct waiter *waiter = NULL;
{
while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause();
{
if (G.first_free_waiter) {
waiter = G.first_free_waiter;
G.first_free_waiter = waiter->next;
} else {
waiter = arena_push_no_zero(G.waiters_arena, struct waiter);
}
}
atomic_i32_fetch_set(&G.waiters_lock, 0);
}
MEMZERO_STRUCT(waiter);
waiter->fiber_id = job_fiber->id;
waiter->job_queue_kind = job_queue_kind;
waiter->job_func = job_func;
waiter->job_sig = job_sig;
waiter->job_counter = job_counter;
waiter->job_id = job_id;
/* Insert waiter */
if (wait_counter->last_waiter) {
wait_counter->last_waiter->next = waiter;
} else {
wait_counter->first_waiter = waiter;
}
wait_counter->last_waiter = waiter;
++wait_counter->num_waiters;
/* Pop worker's job fiber */
job_fiber = NULL;
done = true;
}
}
atomic_i32_fetch_set(&wait_counter->waiters_lock, 0);
}
} break;
case YIELD_KIND_DONE:
@ -675,9 +852,6 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
if (job_counter) {
counter_add(job_counter, -1);
}
/* TODO: remove this */
(UNUSED)fiber_release;
done = true;
} break;
}
@ -2950,6 +3124,9 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
__profthread("Main thread", PROF_THREAD_GROUP_MAIN);
/* Init waiters */
G.waiters_arena = arena_alloc(GIGABYTE(64));
/* Init counters */
G.counters_arena = arena_alloc(GIGABYTE(64));