consolidate yielder & fiber_ctx into fiber struct

This commit is contained in:
jacob 2025-07-09 13:39:02 -05:00
parent 5dd56dea5c
commit 166db8575b
6 changed files with 165 additions and 188 deletions

View File

@ -8,7 +8,7 @@
/* NOTE: Application will exit if arena fails to reserve or commit initial memory. */ /* NOTE: Application will exit if arena fails to reserve or commit initial memory. */
struct arena *arena_alloc(u64 reserve) struct arena *arena_alloc(u64 reserve)
{ {
//__prof; __prof;
reserve += ARENA_HEADER_SIZE; reserve += ARENA_HEADER_SIZE;
/* Round up to nearest block size */ /* Round up to nearest block size */

View File

@ -250,13 +250,13 @@ INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _)
/* TODO: Signal counter that running job wiats on, rather than scheduling job manually */ /* TODO: Signal counter that running job wiats on, rather than scheduling job manually */
while (!atomic_i32_fetch(&G.shutdown)) { while (!atomic_i32_fetch(&G.shutdown)) {
{ {
__profn("Wait for audio event"); __profn("Wasapi wait");
WaitForSingleObject(G.event, INFINITE); WaitForSingleObject(G.event, INFINITE);
} }
{ {
__profn("Run mix job & wait"); __profn("Run mix job & wait");
struct snc_counter counter = ZI; struct snc_counter counter = ZI;
sys_run(1, playback_mix_job, NULL, SYS_PRIORITY_HIGH, &counter); sys_run(1, playback_mix_job, NULL, SYS_PRIORITY_CRITICAL, &counter);
snc_counter_wait(&counter); snc_counter_wait(&counter);
} }
} }

View File

@ -45,7 +45,7 @@ struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin)
if (spin_cnt < spin) { if (spin_cnt < spin) {
ix_pause(); ix_pause();
} else { } else {
sys_wait(&m->v, &v, 4, F32_INFINITY); sys_wait(&m->v, &v, 4, I64_MAX);
spin_cnt = 0; spin_cnt = 0;
} }
} }
@ -82,7 +82,7 @@ struct snc_lock snc_lock_spin_s(struct snc_mutex *m, i32 spin)
if (spin_cnt < spin) { if (spin_cnt < spin) {
ix_pause(); ix_pause();
} else { } else {
sys_wait(&m->v, &v, 4, F32_INFINITY); sys_wait(&m->v, &v, 4, I64_MAX);
spin_cnt = 0; spin_cnt = 0;
} }
} }
@ -122,6 +122,11 @@ void snc_unlock(struct snc_lock *l)
* ========================== */ * ========================== */
void snc_cv_wait(struct snc_cv *cv, struct snc_lock *l) void snc_cv_wait(struct snc_cv *cv, struct snc_lock *l)
{
snc_cv_wait_time(cv, l, I64_MAX);
}
void snc_cv_wait_time(struct snc_cv *cv, struct snc_lock *l, i64 timeout_ns)
{ {
u64 old_wake_gen = atomic_u64_fetch(&cv->wake_gen); u64 old_wake_gen = atomic_u64_fetch(&cv->wake_gen);
struct snc_mutex *mutex = l->mutex; struct snc_mutex *mutex = l->mutex;
@ -130,7 +135,7 @@ void snc_cv_wait(struct snc_cv *cv, struct snc_lock *l)
{ {
snc_unlock(l); snc_unlock(l);
do { do {
sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), F32_INFINITY); sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), timeout_ns);
wake_gen = atomic_u64_fetch(&cv->wake_gen); wake_gen = atomic_u64_fetch(&cv->wake_gen);
} while (wake_gen == old_wake_gen); } while (wake_gen == old_wake_gen);
sys_wake_all(&cv->wake_gen); sys_wake_all(&cv->wake_gen);
@ -165,7 +170,7 @@ void snc_counter_wait(struct snc_counter *counter)
{ {
i64 v = atomic_i64_fetch(&counter->v); i64 v = atomic_i64_fetch(&counter->v);
while (v > 0) { while (v > 0) {
sys_wait(&counter->v, &v, sizeof(v), F32_INFINITY); sys_wait(&counter->v, &v, sizeof(v), I64_MAX);
v = atomic_i64_fetch(&counter->v); v = atomic_i64_fetch(&counter->v);
} }
} }

View File

@ -11,6 +11,7 @@ struct snc_lock {
}; };
struct snc_mutex { struct snc_mutex {
/* Bit 31: exclusive lock held, bit 30: pending exclusive lock, bits 0-30: shared locks count */
struct atomic_u32 v; struct atomic_u32 v;
#if RTC #if RTC
@ -41,6 +42,7 @@ struct snc_cv {
}; };
void snc_cv_wait(struct snc_cv *cv, struct snc_lock *lock); void snc_cv_wait(struct snc_cv *cv, struct snc_lock *lock);
void snc_cv_wait_time(struct snc_cv *cv, struct snc_lock *l, i64 timeout_ns);
void snc_cv_broadcast(struct snc_cv *cv); void snc_cv_broadcast(struct snc_cv *cv);
/* ========================== * /* ========================== *

View File

@ -449,7 +449,7 @@ b32 sys_run_command(struct string cmd);
/* Futex-like wait & wake */ /* Futex-like wait & wake */
void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds); void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns);
void sys_wake_single(void *addr); void sys_wake_single(void *addr);
void sys_wake_all(void *addr); void sys_wake_all(void *addr);
@ -467,9 +467,10 @@ i16 sys_current_fiber_id(void);
enum sys_priority { enum sys_priority {
SYS_PRIORITY_INHERIT = -1, SYS_PRIORITY_INHERIT = -1,
SYS_PRIORITY_HIGH = 0, SYS_PRIORITY_CRITICAL = 0,
SYS_PRIORITY_NORMAL = 1, SYS_PRIORITY_HIGH = 1,
SYS_PRIORITY_BACKGROUND = 2, SYS_PRIORITY_NORMAL = 2,
SYS_PRIORITY_BACKGROUND = 3,
NUM_SYS_PRIORITIES NUM_SYS_PRIORITIES
}; };

View File

@ -101,73 +101,47 @@ struct win32_window {
}; };
#define NUM_WAIT_ADDR_BINS 4096 #define NUM_WAIT_ADDR_BINS 65536
struct alignas(64) wait_list { struct alignas(64) wait_list {
/* =================================================== */ /* =================================================== */
void *addr; /* 8 bytes */ u64 value; /* 08 bytes */
/* =================================================== */ /* =================================================== */
struct yielder *first_yielder; /* 8 bytes */ i16 first_waiter; /* 02 bytes */
i16 last_waiter; /* 02 bytes */
i32 num_waiters; /* 04 bytes */
/* =================================================== */ /* =================================================== */
struct yielder *last_yielder; /* 8 bytes */ struct wait_list *next_in_bin; /* 08 bytes */
/* =================================================== */ /* =================================================== */
struct wait_list *prev_in_bin; /* 08 bytes */
/* =================================================== */ /* =================================================== */
i32 num_yielders; /* 4 bytes */ u8 _pad0[8]; /* 08 bytes (padding) */
u8 _pad0[4]; /* 4 bytes (padding */
/* =================================================== */ /* =================================================== */
struct wait_list *next_in_bin; /* 8 bytes */ u8 _pad1[8]; /* 08 bytes (padding) */
/* =================================================== */ /* =================================================== */
struct wait_list *prev_in_bin; /* 8 bytes */ u8 _pad2[8]; /* 08 bytes (padding) */
/* =================================================== */ /* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */ u8 _pad3[8]; /* 08 bytes (padding) */
}; };
STATIC_ASSERT(sizeof(struct wait_list) == 64); /* Padding validation (increase if necessary) */ STATIC_ASSERT(sizeof(struct wait_list) == 64); /* Padding validation (increase if necessary) */
STATIC_ASSERT(alignof(struct wait_list) == 64); /* Avoid false sharing */ STATIC_ASSERT(alignof(struct wait_list) == 64); /* Avoid false sharing */
struct alignas(64) wait_bin { struct alignas(64) wait_bin {
/* =================================================== */ /* =================================================== */
struct wait_list *first_wait_list; /* 8 bytes */ struct wait_list *first_wait_list; /* 08 bytes */
/* =================================================== */ /* =================================================== */
struct wait_list *last_wait_list; /* 8 bytes */ struct wait_list *last_wait_list; /* 08 bytes */
/* =================================================== */ /* =================================================== */
struct wait_list *first_free_wait_list; /* 8 bytes */ struct wait_list *first_free_wait_list; /* 08 bytes */
/* =================================================== */ /* =================================================== */
struct yielder *first_free_yielder; struct atomic_i32 lock; /* 04 bytes */
u8 _pad0[4]; /* 04 bytes (padding) */
/* =================================================== */ /* =================================================== */
struct atomic_i32 lock; /* 4 bytes */ u8 _pad1[32]; /* 32 bytes (padding) */
u8 _pad0[4]; /* 4 bytes (padding) */
/* =================================================== */
u8 _pad1[24]; /* 24 bytes (padding) */
}; };
STATIC_ASSERT(sizeof(struct wait_bin) == 64); /* Padding validation (increase if necessary) */ STATIC_ASSERT(sizeof(struct wait_bin) == 64); /* Padding validation (increase if necessary) */
STATIC_ASSERT(alignof(struct wait_bin) == 64); /* Avoid false sharing */ STATIC_ASSERT(alignof(struct wait_bin) == 64); /* Avoid false sharing */
struct alignas(64) yielder {
/* =================================================== */
i16 fiber_id; /* 2 bytes */
u8 _pad0[2]; /* 2 bytes (padding) */
i32 job_queue_kind; /* 4 bytes */
/* =================================================== */
sys_job_func *job_func; /* 8 bytes */
/* =================================================== */
void *job_sig; /* 8 bytes */
/* =================================================== */
struct snc_counter *job_counter; /* 8 bytes */
/* =================================================== */
struct yielder *next; /* 8 bytes */
/* =================================================== */
struct yielder *prev; /* 8 bytes */
/* =================================================== */
i32 job_id; /* 4 bytes */
u8 _pad1[4]; /* 4 bytes (padding) */
/* =================================================== */
u8 _pad2[8]; /* 8 bytes (padding) */
};
STATIC_ASSERT(sizeof(struct yielder) == 64); /* Padding validation (increase if necessary) */
STATIC_ASSERT(alignof(struct yielder) == 64); /* Avoid false sharing */
struct alignas(64) counter { struct alignas(64) counter {
/* =================================================== */ /* =================================================== */
@ -186,8 +160,8 @@ STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */
#define FIBER_NAME_PREFIX_CSTR "[" #define FIBER_NAME_PREFIX_CSTR "Fiber ["
#define FIBER_NAME_SUFFIX_CSTR "] Fiber" #define FIBER_NAME_SUFFIX_CSTR "]"
#define FIBER_NAME_MAX_SIZE 64 #define FIBER_NAME_MAX_SIZE 64
enum yield_kind { enum yield_kind {
@ -205,54 +179,57 @@ struct yield_param {
void *addr; void *addr;
void *cmp; void *cmp;
u32 size; u32 size;
i64 timeout_ns;
} wait; } wait;
}; };
}; };
struct alignas(64) fiber { struct alignas(64) fiber {
/* =================================================== */ /* ==================================================== */
char *name_cstr; /* 8 bytes */ void *addr; /* 08 bytes */
/* =================================================== */ /* ==================================================== */
i16 id; /* 2 bytes */ char *name_cstr; /* 08 bytes */
i16 parent_id; /* 2 bytes */ /* ==================================================== */
u8 _pad0[4]; /* 4 bytes (padding) */ i16 id; /* 02 bytes */
/* =================================================== */ i16 parent_id; /* 02 bytes */
void *addr; /* 8 bytes */ i16 next_waiter; /* 02 bytes */
/* =================================================== */ i16 prev_waiter; /* 02 bytes */
sys_job_func *job_func; /* 8 bytes */ /* ==================================================== */
/* =================================================== */ u8 _pad1[8]; /* 08 bytes (padding) */
void *job_sig; /* 8 bytes */ /* ==================================================== */
/* =================================================== */ u8 _pad2[8]; /* 08 bytes (padding) */
i32 job_id; /* 4 bytes */ /* ==================================================== */
i32 job_priority; /* 4 bytes */ u8 _pad3[8]; /* 08 bytes (padding) */
/* =================================================== */ /* ==================================================== */
struct yield_param *yield_param; /* 8 bytes */ u8 _pad4[8]; /* 08 bytes (padding) */
/* =================================================== */ /* ==================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */ u8 _pad5[8]; /* 08 bytes (padding) */
}; /* ==================================================== */
STATIC_ASSERT(sizeof(struct fiber) == 64); /* Padding validation (increase if necessary) */
STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */ /* ==================================================== */
STATIC_ASSERT(SYS_MAX_FIBERS < I16_MAX); /* Max fibers should fit in fiber id */ /* =============== Cache line boundary ================ */
/* ==================================================== */
struct alignas(64) fiber_ctx {
/* ==================================================== */ /* ==================================================== */
struct sys_scratch_ctx scratch_ctx; /* 16 bytes */ struct sys_scratch_ctx scratch_ctx; /* 16 bytes */
/* ==================================================== */ /* ==================================================== */
u8 _pad0[8]; /* 8 bytes (padding) */ sys_job_func *job_func; /* 08 bytes */
/* ==================================================== */ /* ==================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */ void *job_sig; /* 08 bytes */
/* ==================================================== */ /* ==================================================== */
u8 _pad2[8]; /* 8 bytes (padding) */ i32 job_id; /* 04 bytes */
i32 job_priority; /* 04 bytes */
/* ==================================================== */ /* ==================================================== */
u8 _pad3[8]; /* 8 bytes (padding) */ struct snc_counter *job_counter; /* 08 bytes */
/* ==================================================== */ /* ==================================================== */
u8 _pad4[8]; /* 8 bytes (padding) */ struct yield_param *yield_param; /* 08 bytes */
/* ==================================================== */
u8 _pad6[8]; /* 08 bytes (padding) */
}; };
STATIC_ASSERT(sizeof(struct fiber_ctx) == 64); /* Padding validation (increase if necessary) */ STATIC_ASSERT(sizeof(struct fiber) == 128); /* Padding validation (increase if necessary) */
STATIC_ASSERT(alignof(struct fiber_ctx) == 64); /* Avoid false sharing */ STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */
STATIC_ASSERT(SYS_MAX_FIBERS < I16_MAX); /* Max fibers should fit in fiber id */
struct alignas(64) worker_ctx { struct alignas(64) worker_ctx {
@ -273,6 +250,7 @@ struct job_info {
}; };
enum job_queue_kind { enum job_queue_kind {
JOB_QUEUE_KIND_CRITICAL_PRIORITY,
JOB_QUEUE_KIND_HIGH_PRIORITY, JOB_QUEUE_KIND_HIGH_PRIORITY,
JOB_QUEUE_KIND_NORMAL_PRIORITY, JOB_QUEUE_KIND_NORMAL_PRIORITY,
JOB_QUEUE_KIND_BACKGROUND, JOB_QUEUE_KIND_BACKGROUND,
@ -332,18 +310,12 @@ GLOBAL struct {
struct arena *windows_arena; struct arena *windows_arena;
struct win32_window *first_free_window; struct win32_window *first_free_window;
/* Yielders */
struct atomic_i32 yielders_arena_lock; /* TODO: Prevent false sharing */
struct arena *yielders_arena;
/* Wait lists */ /* Wait lists */
struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */ struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */
struct arena *wait_lists_arena; struct arena *wait_lists_arena;
/* Wait table */ /* Wait table */
struct wait_bin wait_bins[NUM_WAIT_ADDR_BINS]; struct wait_bin wait_addr_bins[NUM_WAIT_ADDR_BINS];
/* Fibers */ /* Fibers */
i16 num_fibers; i16 num_fibers;
@ -351,7 +323,6 @@ GLOBAL struct {
struct arena *fiber_names_arena; struct arena *fiber_names_arena;
struct atomic_i32 fibers_lock; /* TODO: Prevent false sharing */ struct atomic_i32 fibers_lock; /* TODO: Prevent false sharing */
struct fiber fibers[SYS_MAX_FIBERS]; struct fiber fibers[SYS_MAX_FIBERS];
struct fiber_ctx fiber_contexts[SYS_MAX_FIBERS];
/* Jobs */ /* Jobs */
struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; struct job_queue job_queues[NUM_JOB_QUEUE_KINDS];
@ -373,6 +344,8 @@ GLOBAL struct {
INTERNAL struct fiber *fiber_from_id(i16 id); INTERNAL struct fiber *fiber_from_id(i16 id);
INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber); INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber);
INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority);
INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind);
@ -393,27 +366,29 @@ INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber);
* Wait / wake * Wait / wake
* ========================== */ * ========================== */
void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds) void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns)
{ {
struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i16 parent_fiber_id = fiber->parent_id; i16 parent_fiber_id = fiber->parent_id;
/* Yield if job fiber, otherwise fall back to windows blocking function */
if (parent_fiber_id > 0) { if (parent_fiber_id > 0) {
/* FIXME: Implement fiber timeout */
*fiber->yield_param = (struct yield_param) { *fiber->yield_param = (struct yield_param) {
.kind = YIELD_KIND_WAIT, .kind = YIELD_KIND_WAIT,
.wait = { .wait = {
.addr = addr, .addr = addr,
.cmp = cmp, .cmp = cmp,
.size = size .size = size,
.timeout_ns = timeout_ns
} }
}; };
struct fiber *parent_fiber = fiber_from_id(parent_fiber_id); struct fiber *parent_fiber = fiber_from_id(parent_fiber_id);
job_fiber_yield(fiber, parent_fiber); job_fiber_yield(fiber, parent_fiber);
} else { } else {
i32 timeout_ms = INFINITE; i32 timeout_ms = 0;
if (timeout_seconds != F32_INFINITY) { if (timeout_ns == I64_MAX) {
timeout_ms = (i32)(timeout_seconds * 1000); timeout_ms = INFINITE;
} else if (timeout_ns != 0) {
timeout_ms = timeout_ns / 1000000;
timeout_ms += (timeout_ms == 0) * math_fsign(timeout_ns);
} }
WaitOnAddress(addr, cmp, size, timeout_ms); WaitOnAddress(addr, cmp, size, timeout_ms);
} }
@ -428,45 +403,45 @@ void sys_wake_single(void *addr)
void sys_wake_all(void *addr) void sys_wake_all(void *addr)
{ {
u64 wait_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS; u64 wait_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *bin = &G.wait_bins[wait_bin_index]; struct wait_bin *bin = &G.wait_addr_bins[wait_bin_index];
i32 num_yielders = 0; i32 num_waiters = 0;
while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
{ {
struct wait_list *wait_list = NULL; struct wait_list *wait_list = NULL;
for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) { for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) {
if (tmp->addr == addr) { if (tmp->value == (u64)addr) {
wait_list = tmp; wait_list = tmp;
} }
} }
if (wait_list && wait_list->num_yielders > 0) { if (wait_list && wait_list->num_waiters > 0) {
num_yielders = wait_list->num_yielders; num_waiters = wait_list->num_waiters;
struct arena_temp scratch = scratch_begin_no_conflict(); struct arena_temp scratch = scratch_begin_no_conflict();
{ {
/* Separate yielders by queue kind */ /* Separate waiters by queue kind */
i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI; i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI;
struct yielder **queue_yielder_arrays[NUM_JOB_QUEUE_KINDS] = ZI; struct fiber **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI;
for (i32 i = 0; i < (i32)countof(queue_yielder_arrays); ++i) { for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) {
/* NOTE: Each array is conservatively sized as the number of all yielders in the list */ /* NOTE: Each array is conservatively sized as the number of all waiters in the list */
queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, num_yielders); queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct fiber *, num_waiters);
} }
for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) { for (struct fiber *waiter = fiber_from_id(wait_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_waiter)) {
enum job_queue_kind queue_kind = yielder->job_queue_kind; enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
i32 index = queue_counts[queue_kind]++; i32 index = queue_counts[queue_kind]++;
struct yielder **array = queue_yielder_arrays[queue_kind]; struct fiber **array = queue_waiter_arrays[queue_kind];
array[index] = yielder; array[index] = waiter;
} }
/* Push jobs */ /* Push jobs */
for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) { for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) {
i32 num_jobs = queue_counts[queue_kind]; i32 num_jobs = queue_counts[queue_kind];
if (num_jobs > 0) { if (num_jobs > 0) {
struct job_queue *queue = &G.job_queues[queue_kind]; struct job_queue *queue = &G.job_queues[queue_kind];
struct yielder **queue_yielders = queue_yielder_arrays[queue_kind]; struct fiber **queue_waiters = queue_waiter_arrays[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
{ {
/* TODO: More efficient batch job list allocation */ /* TODO: More efficient batch job list allocation */
for (i32 i = 0; i < num_jobs; ++i) { for (i32 i = 0; i < num_jobs; ++i) {
struct yielder *yielder = queue_yielders[i]; struct fiber *waiter = queue_waiters[i];
struct job_info *info = NULL; struct job_info *info = NULL;
if (queue->first_free) { if (queue->first_free) {
info = queue->first_free; info = queue->first_free;
@ -476,11 +451,11 @@ void sys_wake_all(void *addr)
} }
MEMZERO_STRUCT(info); MEMZERO_STRUCT(info);
info->count = 1; info->count = 1;
info->num_dispatched = yielder->job_id; info->num_dispatched = waiter->job_id;
info->func = yielder->job_func; info->func = waiter->job_func;
info->sig = yielder->job_sig; info->sig = waiter->job_sig;
info->counter = yielder->job_counter; info->counter = waiter->job_counter;
info->fiber_id = yielder->fiber_id; info->fiber_id = waiter->id;
if (queue->last) { if (queue->last) {
queue->last->next = info; queue->last->next = info;
} else { } else {
@ -492,12 +467,23 @@ void sys_wake_all(void *addr)
atomic_i32_fetch_set(&queue->lock, 0); atomic_i32_fetch_set(&queue->lock, 0);
} }
} }
/* Free yielders */ /* Free wait list */
wait_list->last_yielder->next = bin->first_free_yielder; {
bin->first_free_yielder = wait_list->first_yielder; struct wait_list *prev = wait_list->prev_in_bin;
wait_list->first_yielder = NULL; struct wait_list *next = wait_list->next_in_bin;
wait_list->last_yielder = NULL; if (prev) {
wait_list->num_yielders = 0; prev->next_in_bin = next;
} else {
bin->first_wait_list = next;
}
if (next) {
next->prev_in_bin = prev;
} else {
bin->last_wait_list = prev;
}
wait_list->next_in_bin = bin->first_free_wait_list;
bin->first_free_wait_list = wait_list;
}
} }
scratch_end(scratch); scratch_end(scratch);
} }
@ -509,7 +495,7 @@ void sys_wake_all(void *addr)
/* Wake workers */ /* Wake workers */
/* TODO: Only wake necessary amount of workers */ /* TODO: Only wake necessary amount of workers */
if (num_yielders > 0) { if (num_waiters > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{ {
if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
@ -601,6 +587,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind)
fiber->job_sig = 0; fiber->job_sig = 0;
fiber->job_id = 0; fiber->job_id = 0;
fiber->job_priority = 0; fiber->job_priority = 0;
fiber->job_counter = 0;
fiber->yield_param = 0; fiber->yield_param = 0;
fiber->parent_id = 0; fiber->parent_id = 0;
return fiber; return fiber;
@ -616,16 +603,13 @@ INTERNAL void fiber_release(struct fiber *fiber, i16 fiber_id)
atomic_i32_fetch_set(&G.fibers_lock, 0); atomic_i32_fetch_set(&G.fibers_lock, 0);
} }
INTERNAL struct fiber *fiber_from_id(i16 id) FORCE_INLINE struct fiber *fiber_from_id(i16 id)
{ {
ASSERT(id >= 0 && id < SYS_MAX_FIBERS); if (id <= 0) {
return NULL;
} else {
return &G.fibers[id]; return &G.fibers[id];
} }
INTERNAL struct fiber_ctx *fiber_ctx_from_id(i16 id)
{
ASSERT(id >= 0 && id < SYS_MAX_FIBERS);
return &G.fiber_contexts[id];
} }
/* ========================== * /* ========================== *
@ -645,8 +629,7 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit
} }
struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */ priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ enum job_queue_kind queue_kind = job_queue_kind_from_priority(priority);
enum job_queue_kind queue_kind = (enum job_queue_kind)priority;
struct job_queue *queue = &G.job_queues[queue_kind]; struct job_queue *queue = &G.job_queues[queue_kind];
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
{ {
@ -746,6 +729,18 @@ INTERNAL void job_fiber_entry(void *id_ptr)
* Job worker thread * Job worker thread
* ========================== */ * ========================== */
INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority)
{
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
return (enum job_queue_kind)priority;
}
INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind)
{
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
return (enum sys_priority)queue_kind;
}
INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
{ {
struct worker_ctx *ctx = worker_ctx_arg; struct worker_ctx *ctx = worker_ctx_arg;
@ -784,8 +779,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
/* Pull job from queue */ /* Pull job from queue */
b32 queues_empty = true; b32 queues_empty = true;
enum sys_priority job_priority = 0; enum sys_priority job_priority = 0;
enum job_queue_kind job_queue_kind = 0; i16 job_fiber_id = 0;
i32 job_fiber_id = 0;
i32 job_id = 0; i32 job_id = 0;
sys_job_func *job_func = 0; sys_job_func *job_func = 0;
void *job_sig = 0; void *job_sig = 0;
@ -799,7 +793,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
{ {
struct job_info *info = queue->first; struct job_info *info = queue->first;
job_priority = (enum sys_priority)queue->kind; job_priority = (enum sys_priority)queue->kind;
job_queue_kind = queue->kind; job_priority = job_priority_from_queue_kind(queue->kind);
while (info && !job_func) { while (info && !job_func) {
struct job_info *next = info->next; struct job_info *next = info->next;
b32 dequeue = false; b32 dequeue = false;
@ -807,7 +801,6 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
job_id = info->num_dispatched++; job_id = info->num_dispatched++;
if (job_id < info->count) { if (job_id < info->count) {
/* Pick job */ /* Pick job */
STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */
job_func = info->func; job_func = info->func;
job_sig = info->sig; job_sig = info->sig;
job_counter = info->counter; job_counter = info->counter;
@ -859,6 +852,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
if (job_func) { if (job_func) {
if (!job_fiber) { if (!job_fiber) {
job_fiber = fiber_alloc(FIBER_KIND_JOB_WORKER); job_fiber = fiber_alloc(FIBER_KIND_JOB_WORKER);
job_fiber_id = job_fiber->id;
} }
{ {
__profnc("Run fiber", RGB32_F(0.25, 0.75, 0)); __profnc("Run fiber", RGB32_F(0.25, 0.75, 0));
@ -868,6 +862,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
job_fiber->job_sig = job_sig; job_fiber->job_sig = job_sig;
job_fiber->job_id = job_id; job_fiber->job_id = job_id;
job_fiber->job_priority = job_priority; job_fiber->job_priority = job_priority;
job_fiber->job_counter = job_counter;
job_fiber->parent_id = worker_fiber_id; job_fiber->parent_id = worker_fiber_id;
job_fiber->yield_param = &yield; job_fiber->yield_param = &yield;
b32 done = false; b32 done = false;
@ -884,13 +879,12 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
case YIELD_KIND_WAIT: case YIELD_KIND_WAIT:
{ {
#if 1
void *wait_addr = yield.wait.addr; void *wait_addr = yield.wait.addr;
void *wait_cmp = yield.wait.cmp; void *wait_cmp = yield.wait.cmp;
u32 wait_size = yield.wait.size; u32 wait_size = yield.wait.size;
u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_ADDR_BINS; u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *bin = &G.wait_bins[wait_bin_index]; struct wait_bin *bin = &G.wait_addr_bins[wait_bin_index];
while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
{ {
@ -898,7 +892,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
/* Search addr wait list in bin */ /* Search addr wait list in bin */
struct wait_list *wait_list = NULL; struct wait_list *wait_list = NULL;
for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) { for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) {
if (tmp->addr == wait_addr) { if (tmp->value == (u64)wait_addr) {
wait_list = tmp; wait_list = tmp;
} }
} }
@ -916,7 +910,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0); atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
} }
MEMZERO_STRUCT(wait_list); MEMZERO_STRUCT(wait_list);
wait_list->addr = wait_addr; wait_list->value = wait_addr;
if (bin->last_wait_list) { if (bin->last_wait_list) {
bin->last_wait_list->next_in_bin = wait_list; bin->last_wait_list->next_in_bin = wait_list;
wait_list->prev_in_bin = bin->last_wait_list; wait_list->prev_in_bin = bin->last_wait_list;
@ -926,33 +920,15 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
bin->last_wait_list = wait_list; bin->last_wait_list = wait_list;
} }
/* Allocate new yielder */ /* Insert fiber into wait list */
struct yielder *yielder = NULL; if (wait_list->last_waiter) {
if (bin->first_free_yielder) { fiber_from_id(wait_list->last_waiter)->next_waiter = job_fiber_id;
yielder = bin->first_free_yielder; job_fiber->prev_waiter = wait_list->last_waiter;
bin->first_free_yielder = yielder->next;
} else { } else {
while (atomic_i32_fetch_test_set(&G.yielders_arena_lock, 0, 1) != 0) ix_pause(); wait_list->first_waiter = job_fiber_id;
{
yielder = arena_push_no_zero(G.yielders_arena, struct yielder);
} }
atomic_i32_fetch_set(&G.yielders_arena_lock, 0); wait_list->last_waiter = job_fiber_id;
} ++wait_list->num_waiters;
MEMZERO_STRUCT(yielder);
yielder->fiber_id = job_fiber->id;
yielder->job_queue_kind = job_queue_kind;
yielder->job_func = job_func;
yielder->job_sig = job_sig;
yielder->job_counter = job_counter;
yielder->job_id = job_id;
if (wait_list->last_yielder) {
wait_list->last_yielder->next = yielder;
yielder->prev = wait_list->last_yielder;
} else {
wait_list->first_yielder = yielder;
}
wait_list->last_yielder = yielder;
++wait_list->num_yielders;
/* Pop worker's job fiber */ /* Pop worker's job fiber */
job_fiber = NULL; job_fiber = NULL;
@ -960,10 +936,6 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
} }
} }
atomic_i32_fetch_set(&bin->lock, 0); atomic_i32_fetch_set(&bin->lock, 0);
#else
(UNUSED)job_queue_kind;
//ASSERT(false);
#endif
} break; } break;
case YIELD_KIND_DONE: case YIELD_KIND_DONE:
@ -1068,8 +1040,8 @@ INTERNAL SYS_THREAD_DEF(test_entry, _)
struct sys_scratch_ctx *sys_scratch_ctx_from_fiber_id(i16 id) struct sys_scratch_ctx *sys_scratch_ctx_from_fiber_id(i16 id)
{ {
struct fiber_ctx *fiber_ctx = fiber_ctx_from_id(id); struct fiber *fiber = fiber_from_id(id);
struct sys_scratch_ctx *scratch_ctx = &fiber_ctx->scratch_ctx; struct sys_scratch_ctx *scratch_ctx = &fiber->scratch_ctx;
if (!scratch_ctx->arenas[0]) { if (!scratch_ctx->arenas[0]) {
__profn("Initialize scratch context"); __profn("Initialize scratch context");
for (u32 i = 0; i < countof(scratch_ctx->arenas); ++i) { for (u32 i = 0; i < countof(scratch_ctx->arenas); ++i) {
@ -3002,9 +2974,6 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
__profthread("Main thread", PROF_THREAD_GROUP_MAIN); __profthread("Main thread", PROF_THREAD_GROUP_MAIN);
/* Init yielders */
G.yielders_arena = arena_alloc(GIBI(64));
/* Init wait lists */ /* Init wait lists */
G.wait_lists_arena = arena_alloc(GIBI(64)); G.wait_lists_arena = arena_alloc(GIBI(64));