From d47b951b82ee4bd728e152cb07c02b0a0050c521 Mon Sep 17 00:00:00 2001 From: jacob Date: Sat, 5 Jul 2025 21:01:30 -0500 Subject: [PATCH] wait / wake wip --- src/asset_cache.c | 2 +- src/gp_dx12.c | 4 +- src/resource.c | 2 +- src/sprite.c | 2 +- src/sys.h | 21 +- src/sys_win32.c | 618 ++++++++++++++++++++++++++-------------------- 6 files changed, 373 insertions(+), 276 deletions(-) diff --git a/src/asset_cache.c b/src/asset_cache.c index b773b560..c9f55fb8 100644 --- a/src/asset_cache.c +++ b/src/asset_cache.c @@ -192,7 +192,7 @@ void asset_cache_wait(struct asset *asset) if (asset->status != ASSET_STATUS_READY) { /* Wait on job */ if (asset->counter) { - sys_wait(asset->counter); + sys_counter_wait(asset->counter); } /* Wait for asset to be ready */ sync_flag_wait(&asset->asset_ready_sf); diff --git a/src/gp_dx12.c b/src/gp_dx12.c index 54a92cdb..6a63c44f 100644 --- a/src/gp_dx12.c +++ b/src/gp_dx12.c @@ -930,7 +930,7 @@ INTERNAL SYS_JOB_DEF(pipeline_init_job, job) struct sys_counter *counter = sys_counter_alloc(); { sys_run(countof(params), shader_compile_job, &comp_sig, SYS_PRIORITY_HIGH, counter); - sys_wait(counter); + sys_counter_wait(counter); } sys_counter_release(counter); success = vs.success && ps.success; @@ -1127,7 +1127,7 @@ INTERNAL void pipeline_alloc(u64 num_pipelines, struct pipeline_desc *descs_in, struct sys_counter *counter = sys_counter_alloc(); { sys_run(num_pipelines, pipeline_init_job, &sig, SYS_PRIORITY_HIGH, counter); - sys_wait(counter); + sys_counter_wait(counter); } sys_counter_release(counter); } diff --git a/src/resource.c b/src/resource.c index cbedc6b6..6b58a6ce 100644 --- a/src/resource.c +++ b/src/resource.c @@ -291,7 +291,7 @@ INTERNAL SYS_THREAD_DEF(resource_watch_dispatcher_thread_entry_point, _) struct sys_counter *counter = sys_counter_alloc(); { sys_run(num_callbacks, resource_watch_callback_job, &sig, SYS_PRIORITY_BACKGROUND, counter); - sys_wait(counter); + sys_counter_wait(counter); } sys_counter_release(counter); } diff --git a/src/sprite.c b/src/sprite.c index 1a084f2a..2c8951bf 100644 --- a/src/sprite.c +++ b/src/sprite.c @@ -1381,7 +1381,7 @@ INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg) struct sys_lock evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex); while (!G.evictor_scheduler_shutdown) { sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, job_counter); - sys_wait(job_counter); + sys_counter_wait(job_counter); sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS)); } sys_mutex_unlock(&evictor_lock); diff --git a/src/sys.h b/src/sys.h index 0ba274d8..33c3d33c 100644 --- a/src/sys.h +++ b/src/sys.h @@ -477,17 +477,14 @@ b32 sys_run_command(struct string cmd); - /* ========================== * - * Counter + * Wait * ========================== */ -struct sys_counter *sys_counter_alloc(void); -void sys_counter_release(struct sys_counter *counter); +/* Futex-like wait & wake */ -void sys_counter_add(struct sys_counter *counter, i64 amount); - -void sys_wait(struct sys_counter *counter); +void sys_wait(void *addr, void *cmp, u32 size); +void sys_wake_all(void *addr); /* ========================== * * Fiber @@ -497,8 +494,18 @@ void sys_wait(struct sys_counter *counter); i32 sys_current_fiber_id(void); +/* Cooperative yield to give other jobs of the same priority level a chance to run */ void sys_yield(void); +/* ========================== * + * Counter + * ========================== */ + +struct sys_counter *sys_counter_alloc(void); +void sys_counter_release(struct sys_counter *counter); +void sys_counter_add(struct sys_counter *counter, i64 amount); +void sys_counter_wait(struct sys_counter *counter); + /* ========================== * * Job * ========================== */ diff --git a/src/sys_win32.c b/src/sys_win32.c index 60a403d5..de905195 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -52,7 +52,7 @@ struct win32_condition_variable { CONDITION_VARIABLE condition_variable; struct win32_condition_variable *next_free; #if RTC - struct atomic_i64 num_yielding_waiters; + struct atomic_i64 num_yielders; #endif }; @@ -112,7 +112,48 @@ struct win32_window { struct win32_window *next_free; }; -struct alignas(64) waiter { + +#define NUM_WAIT_BINS 4096 + +struct alignas(64) wait_list { + /* =================================================== */ + void *addr; /* 8 bytes */ + /* =================================================== */ + struct yielder *first_yielder; /* 8 bytes */ + /* =================================================== */ + struct yielder *last_yielder; /* 8 bytes */ + /* =================================================== */ + struct yielder *first_free_yielder; /* 8 bytes */ + /* =================================================== */ + i32 num_yielders; /* 4 bytes */ + u8 _pad0[4]; /* 4 bytes (padding */ + /* =================================================== */ + struct wait_list *next_in_bin; /* 8 bytes */ + /* =================================================== */ + struct wait_list *prev_in_bin; /* 8 bytes */ + /* =================================================== */ + u8 _pad1[8]; /* 8 bytes (padding) */ +}; +STATIC_ASSERT(sizeof(struct wait_list) == 64); /* Assume wait_list fits in one cache line (increase if necessary) */ +STATIC_ASSERT(alignof(struct wait_list) == 64); /* Avoid false sharing */ + +struct alignas(64) wait_bin { + /* =================================================== */ + struct wait_list *first_wait_list; /* 8 bytes */ + /* =================================================== */ + struct wait_list *last_wait_list; /* 8 bytes */ + /* =================================================== */ + struct wait_list *first_free_wait_list; /* 8 bytes */ + /* =================================================== */ + struct atomic_i32 lock; /* 4 bytes */ + u8 _pad0[4]; /* 4 bytes (padding) */ + /* =================================================== */ + u8 _pad1[32]; /* 32 bytes (padding) */ +}; +STATIC_ASSERT(sizeof(struct wait_bin) == 64); /* Assume wait_bin fits in one cache line (increase if necessary) */ +STATIC_ASSERT(alignof(struct wait_bin) == 64); /* Avoid false sharing */ + +struct alignas(64) yielder { /* =================================================== */ i32 fiber_id; /* 4 bytes */ i32 job_queue_kind; /* 4 bytes */ @@ -123,17 +164,17 @@ struct alignas(64) waiter { /* =================================================== */ struct counter *job_counter; /* 8 bytes */ /* =================================================== */ - struct waiter *next; /* 8 bytes */ + struct yielder *next; /* 8 bytes */ + /* =================================================== */ + struct yielder *prev; /* 8 bytes */ /* =================================================== */ i32 job_id; /* 4 bytes */ u8 _pad0[4]; /* 4 bytes (padding) */ /* =================================================== */ u8 _pad1[8]; /* 8 bytes (padding) */ - /* =================================================== */ - u8 _pad2[8]; /* 8 bytes (padding) */ }; -STATIC_ASSERT(sizeof(struct waiter) == 64); /* Assume waiter fits in one cache line (increase if necessary) */ -STATIC_ASSERT(alignof(struct waiter) == 64); /* Avoid false sharing */ +STATIC_ASSERT(sizeof(struct yielder) == 64); /* Assume yielder fits in one cache line (increase if necessary) */ +STATIC_ASSERT(alignof(struct yielder) == 64); /* Avoid false sharing */ @@ -141,22 +182,9 @@ STATIC_ASSERT(alignof(struct waiter) == 64); /* Avoid false sharing */ struct alignas(64) counter { /* =================================================== */ struct atomic_i64 v; /* 8 bytes */ - /* =================================================== */ struct counter *next_free; /* 8 bytes */ /* =================================================== */ - struct atomic_i32 waiters_lock; /* 4 bytes */ - u8 _pad0[4]; /* 4 bytes (padding) */ - /* =================================================== */ - i32 num_blocking_waiters; /* 4 bytes */ - i32 num_yielding_waiters; /* 4 bytes */ - /* =================================================== */ - struct waiter *first_waiter; /* 8 bytes */ - /* =================================================== */ - struct waiter *last_waiter; /* 8 bytes */ - /* =================================================== */ - struct atomic_i64 wake_gen; /* 8 bytes */ - /* =================================================== */ - u8 _pad1[8]; /* 8 bytes (padding) */ + u8 _pad[48]; /* 56 bytes (padding) */ }; STATIC_ASSERT(sizeof(struct counter) == 64); /* Assume counter fits in one cache line (increase if necessary) */ STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */ @@ -180,6 +208,17 @@ enum yield_kind { NUM_YIELD_KINDS }; +struct yield_param { + enum yield_kind kind; + union { + struct { + void *addr; + void *cmp; + u32 size; + } wait; + }; +}; + struct alignas(64) fiber { /* =================================================== */ char *name_cstr; /* 8 bytes */ @@ -196,10 +235,9 @@ struct alignas(64) fiber { i32 job_id; /* 4 bytes */ i32 job_priority; /* 4 bytes */ /* =================================================== */ - void *yield_arg; /* 8 bytes */ + struct yield_param *yield_param; /* 8 bytes */ /* =================================================== */ - enum yield_kind yield_kind; /* 4 bytes */ - u8 _pad0[4]; /* 4 bytes (padding) */ + u8 _pad0[8]; /* 8 bytes (padding) */ }; STATIC_ASSERT(sizeof(struct fiber) == 64); /* Assume fiber fits in one cache line (increase if necessary) */ STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */ @@ -315,19 +353,16 @@ GLOBAL struct { + /* Yielders */ + struct atomic_i32 yielders_arena_lock; /* TODO: Prevent false sharing */ + struct arena *yielders_arena; + /* Wait lists */ + struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */ + struct arena *wait_lists_arena; - - - /* Waiters */ - struct arena *waiters_arena; - struct atomic_i32 waiters_lock; /* TODO: Prevent false sharing */ - struct waiter *first_free_waiter; - - /* Counters */ - struct arena *counters_arena; - struct atomic_i32 counters_lock; /* TODO: Prevent false sharing */ - struct counter *first_free_counter; + /* Wait table */ + struct wait_bin wait_bins[NUM_WAIT_BINS]; /* Fibers */ i32 num_fibers; @@ -337,6 +372,11 @@ GLOBAL struct { struct fiber fibers[SYS_MAX_FIBERS]; struct fiber_ctx fiber_contexts[SYS_MAX_FIBERS]; + /* Counters */ + struct arena *counters_arena; + struct atomic_i32 counters_lock; /* TODO: Prevent false sharing */ + struct counter *first_free_counter; + /* Jobs */ struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; @@ -350,195 +390,138 @@ GLOBAL struct { } G = ZI, DEBUG_ALIAS(G, G_sys_win32); + + +INTERNAL struct fiber *fiber_from_id(i32 id); +INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param); + + + + + + + + + + + + + + + + /* ========================== * - * Counters + * Wait / wake * ========================== */ -INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_kind kind, void *arg); -INTERNAL struct fiber *fiber_from_id(i32 id); - -INTERNAL struct counter *counter_alloc(void) +void sys_wait(void *addr, void *cmp, u32 size) { - struct counter *counter = NULL; +#if 1 + WaitOnAddress(addr, cmp, size, INFINITE); +#else + struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); + i32 parent_fiber_id = fiber->parent_id; + /* Yield if job fiber, otherwise fall back to windows blocking function */ + if (parent_fiber_id > 0) { +#if 1 + /* Yield if job fiber */ + struct yield_param param = { + .kind = YIELD_KIND_WAIT, + .wait = { + .addr = addr, + .cmp = cmp, + .size = size + } + }; + fiber_yield(fiber, fiber_from_id(parent_fiber_id), ¶m); +#else + while (MEMEQ(addr, cmp, size)) { + ix_pause(); + } +#endif + } else { + WaitOnAddress(addr, cmp, size, INFINITE); + } +#endif +} + +void sys_wake_all(void *addr) +{ +#if 0 + u64 wait_bin_index = (u64)addr % NUM_WAIT_BINS; + struct wait_bin *bin = &G.wait_bins[wait_bin_index]; + + while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); { - while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause(); - { - if (G.first_free_counter) { - counter = G.first_free_counter; - G.first_free_counter = counter->next_free; - } else { - counter = arena_push_no_zero(G.counters_arena, struct counter); + struct wait_list *wait_list = NULL; + for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) { + if (tmp->addr == addr) { + wait_list = tmp; } } - atomic_i32_fetch_set(&G.counters_lock, 0); - } - MEMZERO_STRUCT(counter); - return counter; -} - -INTERNAL void counter_release(struct counter *counter) -{ - ASSERT(counter->first_waiter == NULL); /* Counter should not have any waiters */ - while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause(); - { - counter->next_free = G.first_free_counter; - G.first_free_counter = counter; - } - atomic_i32_fetch_set(&G.counters_lock, 0); -} - -INTERNAL void counter_add(struct counter *counter, i64 amount) -{ - i64 old_v = atomic_i64_fetch_add(&counter->v, amount); - i64 new_v = old_v + amount; - if (old_v > 0 && new_v <= 0) { - /* Wake waiters */ - struct waiter *first_waiter = NULL; - struct waiter *last_waiter = NULL; - { - while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause(); + if (wait_list && wait_list->num_yielders > 0) { + struct arena_temp scratch = scratch_begin_no_conflict(); { - /* Wake blocking waiters */ - atomic_i64_fetch_add(&counter->wake_gen, 1); - if (counter->num_blocking_waiters > 0) { - WakeByAddressAll(&counter->wake_gen); + /* Separate yielders by queue kind */ + i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI; + struct yielder **queue_yielder_arrays[NUM_JOB_QUEUE_KINDS] = ZI; + for (i32 i = 0; i < (i32)countof(queue_yielder_arrays); ++i) { + /* NOTE: Each array is conservatively sized as the number of all yielders in the list */ + queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, wait_list->num_yielders); } - - /* Wake yielding waiters (fibers) */ - first_waiter = counter->first_waiter; - last_waiter = counter->last_waiter; - if (first_waiter) { - struct arena_temp scratch = scratch_begin_no_conflict(); - { - /* Separate waiters by queue kind */ - i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI; - struct waiter **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI; - for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) { - /* NOTE: Each array is conservatively sized as the number of all waiters in the counter */ - queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct waiter *, counter->num_yielding_waiters); - } - for (struct waiter *waiter = first_waiter; waiter; waiter = waiter->next) { - enum job_queue_kind queue_kind = waiter->job_queue_kind; - i32 index = queue_counts[queue_kind]++; - struct waiter **array = queue_waiter_arrays[index]; - array[index] = waiter; - } - - /* Push jobs */ - for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) { - i32 num_jobs = queue_counts[queue_kind]; - if (num_jobs > 0) { - struct job_queue *queue = &G.job_queues[queue_kind]; - struct waiter **queue_waiters = queue_waiter_arrays[queue_kind]; - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); - { - /* TODO: More efficient batch job list allocation */ - for (i32 i = 0; i < num_jobs; ++i) { - struct waiter *waiter = queue_waiters[i]; - struct job_info *info = NULL; - if (queue->first_free) { - info = queue->first_free; - queue->first_free = info->next; - } else { - info = arena_push_no_zero(queue->arena, struct job_info); - } - MEMZERO_STRUCT(info); - info->count = 1; - info->num_dispatched = waiter->job_id; - info->func = waiter->job_func; - info->sig = waiter->job_sig; - info->counter = waiter->job_counter; - info->fiber_id = waiter->fiber_id; - if (queue->last) { - queue->last->next = info; - } else { - queue->first = info; - } - queue->last = info; - } + for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) { + enum job_queue_kind queue_kind = yielder->job_queue_kind; + i32 index = queue_counts[queue_kind]++; + struct yielder **array = queue_yielder_arrays[index]; + array[index] = yielder; + } + /* Push jobs */ + for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) { + i32 num_jobs = queue_counts[queue_kind]; + if (num_jobs > 0) { + struct job_queue *queue = &G.job_queues[queue_kind]; + struct yielder **queue_yielders = queue_yielder_arrays[queue_kind]; + while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); + { + /* TODO: More efficient batch job list allocation */ + for (i32 i = 0; i < num_jobs; ++i) { + struct yielder *yielder = queue_yielders[i]; + struct job_info *info = NULL; + if (queue->first_free) { + info = queue->first_free; + queue->first_free = info->next; + } else { + info = arena_push_no_zero(queue->arena, struct job_info); } - atomic_i32_fetch_set(&queue->lock, 0); + MEMZERO_STRUCT(info); + info->count = 1; + info->num_dispatched = yielder->job_id; + info->func = yielder->job_func; + info->sig = yielder->job_sig; + info->counter = yielder->job_counter; + info->fiber_id = yielder->fiber_id; + if (queue->last) { + queue->last->next = info; + } else { + queue->first = info; + } + queue->last = info; } } - - /* Reset waiters list */ - counter->num_yielding_waiters = 0; - counter->first_waiter = NULL; - counter->last_waiter = NULL; + atomic_i32_fetch_set(&queue->lock, 0); } - scratch_end(scratch); } - counter->num_yielding_waiters = 0; } - atomic_i32_fetch_set(&counter->waiters_lock, 0); - } - - /* Add waiters to free list */ - if (first_waiter) { - while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause(); - { - last_waiter->next = G.first_free_waiter; - G.first_free_waiter = first_waiter; - } - atomic_i32_fetch_set(&G.waiters_lock, 0); + scratch_end(scratch); } } -} - -INTERNAL void counter_wait(struct counter *counter) -{ - __prof; - /* TODO: Spin with configurable count */ - - if (atomic_i64_fetch(&counter->v) > 0) { - struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); - i32 parent_fiber_id = fiber->parent_id; - if (parent_fiber_id > 0) { -#if 0 - /* Yield if job fiber */ - yield(fiber, fiber_from_id(parent_fiber_id), YIELD_KIND_WAIT, counter); -#else - while (atomic_i64_fetch(&counter->v) != 0) { - ix_pause(); - } + atomic_i32_fetch_set(&bin->lock, 1); #endif - } else { - /* Top-level fibers should block since they can't yield */ - i64 wake_gen = 0; - { - while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause(); - ++counter->num_blocking_waiters; - wake_gen = atomic_i64_fetch(&counter->wake_gen); - atomic_i32_fetch_set(&counter->waiters_lock, 0); - } - while (atomic_i64_fetch(&counter->wake_gen) <= wake_gen) { - WaitOnAddress(&counter->wake_gen, &wake_gen, sizeof(wake_gen), INFINITE); - } - } - } -#if 0 - if (atomic_i64_fetch(&counter->v) > 0) { - /* Top-level fibers should block since they can't yield */ - i64 wake_gen = 0; - { - while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause(); - ++counter->num_blocking_waiters; - wake_gen = atomic_i64_fetch(&counter->wake_gen); - atomic_i32_fetch_set(&counter->waiters_lock, 0); - } - while (atomic_i64_fetch(&counter->wake_gen) <= wake_gen) { - WaitOnAddress(&counter->wake_gen, &wake_gen, sizeof(wake_gen), INFINITE); - } - } -#endif + /* Wake blocking waiters */ + WakeByAddressAll(addr); } -struct sys_counter *sys_counter_alloc(void) { return (struct sys_counter *)counter_alloc(); } -void sys_counter_release(struct sys_counter *counter) { counter_release((struct counter *)counter); } -void sys_counter_add(struct sys_counter *counter, i64 amount) { counter_add((struct counter *)counter, amount); } -void sys_wait(struct sys_counter *counter) { counter_wait((struct counter *)counter); } - /* ========================== * * Fibers * ========================== */ @@ -616,7 +599,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind) fiber->job_sig = 0; fiber->job_id = 0; fiber->job_priority = 0; - fiber->yield_kind = 0; + fiber->yield_param = 0; fiber->parent_id = 0; return fiber; } @@ -643,6 +626,63 @@ INTERNAL struct fiber_ctx *fiber_ctx_from_id(i32 id) return &G.fiber_contexts[id]; } +/* ========================== * + * Counters + * ========================== */ + +INTERNAL struct counter *counter_alloc(void) +{ + struct counter *counter = NULL; + { + while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause(); + { + if (G.first_free_counter) { + counter = G.first_free_counter; + G.first_free_counter = counter->next_free; + } else { + counter = arena_push_no_zero(G.counters_arena, struct counter); + } + } + atomic_i32_fetch_set(&G.counters_lock, 0); + } + MEMZERO_STRUCT(counter); + return counter; +} + +INTERNAL void counter_release(struct counter *counter) +{ + while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause(); + { + counter->next_free = G.first_free_counter; + G.first_free_counter = counter; + } + atomic_i32_fetch_set(&G.counters_lock, 0); +} + +INTERNAL void counter_add(struct counter *counter, i64 amount) +{ + i64 old_v = atomic_i64_fetch_add(&counter->v, amount); + i64 new_v = old_v + amount; + if (old_v > 0 && new_v <= 0) { + sys_wake_all(&counter->v); + } +} + +INTERNAL void counter_wait(struct counter *counter) +{ + /* TODO: Spin with configurable count */ + i64 v = atomic_i64_fetch(&counter->v); + while (v > 0) { + sys_wait(&counter->v, &v, sizeof(v)); + v = atomic_i64_fetch(&counter->v); + } +} + +struct sys_counter *sys_counter_alloc(void) { return (struct sys_counter *)counter_alloc(); } +void sys_counter_release(struct sys_counter *counter) { counter_release((struct counter *)counter); } +void sys_counter_add(struct sys_counter *counter, i64 amount) { counter_add((struct counter *)counter, amount); } +void sys_counter_wait(struct sys_counter *counter) { counter_wait((struct counter *)counter); } + /* ========================== * * Test job * ========================== */ @@ -652,7 +692,7 @@ i32 sys_current_fiber_id(void) return (i32)(i64)GetFiberData(); } -INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_kind kind, void *arg) +INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param) { ASSERT(fiber->id == sys_current_fiber_id()); ASSERT(parent_fiber->id == fiber->parent_id); @@ -661,8 +701,7 @@ INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_ } { __prof_fiber_leave; - fiber->yield_kind = kind; - fiber->yield_arg = arg; + fiber->yield_param = param; SwitchToFiber(parent_fiber->addr); __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id); } @@ -670,11 +709,13 @@ INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_ void sys_yield(void) { + /* TODO: Don't yield if job queue is empty */ struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); i32 parent_id = fiber->parent_id; if (parent_id > 0) { /* Top level fibers should not yield */ struct fiber *parent_fiber = fiber_from_id(parent_id); - yield(fiber, parent_fiber, YIELD_KIND_COOPERATIVE, NULL); + struct yield_param param = { .kind = YIELD_KIND_COOPERATIVE }; + fiber_yield(fiber, parent_fiber, ¶m); } } @@ -723,6 +764,7 @@ INTERNAL void job_fiber_entry(void *id_ptr) { i32 id = (i32)(i64)id_ptr; struct fiber *fiber = fiber_from_id(id); + struct yield_param yield = ZI; while (true) { __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id); i32 parent_id = fiber->parent_id; @@ -731,13 +773,14 @@ INTERNAL void job_fiber_entry(void *id_ptr) /* Run job */ { __profscope(Run job); - fiber->yield_kind = YIELD_KIND_NONE; - fiber->yield_arg = NULL; + yield.kind = YIELD_KIND_NONE; + fiber->yield_param = &yield; struct sys_job_data data = ZI; data.id = fiber->job_id; data.sig = fiber->job_sig; fiber->job_func(data); - fiber->yield_kind = YIELD_KIND_DONE; + yield.kind = YIELD_KIND_DONE; + fiber->yield_param = &yield; } __prof_fiber_leave; SwitchToFiber(parent_fiber_addr); @@ -766,6 +809,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) /* Pull job from queue */ enum sys_priority job_priority = 0; enum job_queue_kind job_queue_kind = 0; + i32 job_fiber_id = 0; i32 job_id = 0; sys_job_func *job_func = 0; void *job_sig = 0; @@ -800,10 +844,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) } } else { /* This job is being resumed from a yield */ - if (job_fiber) { - fiber_release(job_fiber, job_fiber->id); - } - job_fiber = fiber_from_id(info->fiber_id); + job_fiber_id = info->fiber_id; job_id = info->num_dispatched; job_func = info->func; job_sig = info->sig; @@ -826,6 +867,14 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) } } + /* Use resumed fiber if present */ + if (job_fiber_id > 0) { + if (job_fiber) { + fiber_release(job_fiber, job_fiber->id); + } + job_fiber = fiber_from_id(job_fiber_id); + } + /* Run fiber */ if (job_func) { __profscope(Run fiber); @@ -840,59 +889,97 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) b32 done = false; while (!done) { SwitchToFiber(job_fiber->addr); - enum yield_kind yield_kind = job_fiber->yield_kind; + struct yield_param *yield_param = job_fiber->yield_param; + enum yield_kind yield_kind = yield_param ? yield_param->kind : YIELD_KIND_NONE; switch (yield_kind) { default: { /* Invalid yield kind */ - sys_panic(LIT("Fiber yielded with unknown yield kind")); + sys_panic(LIT("Invalid fiber yield")); } break; case YIELD_KIND_WAIT: { - struct counter *wait_counter = job_fiber->yield_arg; +#if 1 + void *wait_addr = yield_param->wait.addr; + void *wait_cmp = yield_param->wait.cmp; + u32 wait_size = yield_param->wait.size; + + u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_BINS; + struct wait_bin *bin = &G.wait_bins[wait_bin_index]; + + while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); { - while (atomic_i32_fetch_test_set(&wait_counter->waiters_lock, 0, 1) != 0) ix_pause(); - { - if (atomic_i64_fetch(&wait_counter->v) > 0) { /* Re-check counter now that it's locked */ - /* Allocate waiter */ - struct waiter *waiter = NULL; - { - while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause(); - { - if (G.first_free_waiter) { - waiter = G.first_free_waiter; - G.first_free_waiter = waiter->next; - } else { - waiter = arena_push_no_zero(G.waiters_arena, struct waiter); - } - } - atomic_i32_fetch_set(&G.waiters_lock, 0); + if (!MEMEQ(wait_addr, wait_cmp, wait_size)) { + /* Search addr wait list in bin */ + struct wait_list *wait_list = NULL; + for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) { + if (tmp->addr == wait_addr) { + wait_list = tmp; } - MEMZERO_STRUCT(waiter); - waiter->fiber_id = job_fiber->id; - waiter->job_queue_kind = job_queue_kind; - waiter->job_func = job_func; - waiter->job_sig = job_sig; - waiter->job_counter = job_counter; - waiter->job_id = job_id; - - /* Insert waiter */ - if (wait_counter->last_waiter) { - wait_counter->last_waiter->next = waiter; - } else { - wait_counter->first_waiter = waiter; - } - wait_counter->last_waiter = waiter; - ++wait_counter->num_yielding_waiters; - - /* Pop worker's job fiber */ - job_fiber = NULL; - done = true; } + + /* Allocate new wait list */ + if (!wait_list) { + if (bin->first_free_wait_list) { + wait_list = bin->first_free_wait_list; + bin->first_free_wait_list = wait_list->next_in_bin; + } else { + while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause(); + { + wait_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list); + } + atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0); + } + MEMZERO_STRUCT(wait_list); + wait_list->addr = wait_addr; + if (bin->last_wait_list) { + bin->last_wait_list->next_in_bin = wait_list; + wait_list->prev_in_bin = bin->last_wait_list; + } else { + bin->first_wait_list = wait_list; + } + bin->last_wait_list = wait_list; + } + + /* Allocate new yielder */ + struct yielder *yielder = NULL; + if (wait_list->first_free_yielder) { + yielder = wait_list->first_free_yielder; + wait_list->first_free_yielder = yielder->next; + } else { + while (atomic_i32_fetch_test_set(&G.yielders_arena_lock, 0, 1) != 0) ix_pause(); + { + yielder = arena_push_no_zero(G.yielders_arena, struct yielder); + } + atomic_i32_fetch_set(&G.yielders_arena_lock, 0); + } + MEMZERO_STRUCT(yielder); + yielder->fiber_id = job_fiber->id; + yielder->job_queue_kind = job_queue_kind; + yielder->job_func = job_func; + yielder->job_sig = job_sig; + yielder->job_counter = job_counter; + yielder->job_id = job_id; + if (wait_list->last_yielder) { + wait_list->last_yielder->next = yielder; + yielder->prev = wait_list->last_yielder; + } else { + wait_list->first_yielder = yielder; + } + wait_list->last_yielder = yielder; + ++wait_list->num_yielders; + + /* Pop worker's job fiber */ + job_fiber = NULL; + done = true; } - atomic_i32_fetch_set(&wait_counter->waiters_lock, 0); } + atomic_i32_fetch_set(&bin->lock, 0); +#else + (UNUSED)job_queue_kind; + ASSERT(false); +#endif } break; case YIELD_KIND_DONE: @@ -2577,7 +2664,7 @@ void sys_condition_variable_release(struct sys_condition_variable *sys_cv) __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; /* Condition variable must not have any sleepers (signal before releasing) */ - ASSERT(atomic_i64_fetch(&cv->num_yielding_waiters) == 0); + ASSERT(atomic_i64_fetch(&cv->num_yielders) == 0); win32_condition_variable_release(cv); } @@ -2587,7 +2674,7 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s struct win32_mutex *m = (struct win32_mutex *)lock->mutex; b32 exclusive = lock->exclusive; #if RTC - atomic_i64_fetch_add(&cv->num_yielding_waiters, 1); + atomic_i64_fetch_add(&cv->num_yielders, 1); if (exclusive) { m->owner_tid = 0; } @@ -2614,7 +2701,7 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s if (exclusive) { m->owner_tid = (u64)GetCurrentThreadId(); } - atomic_i64_fetch_add(&cv->num_yielding_waiters, -1); + atomic_i64_fetch_add(&cv->num_yielders, -1); #endif } @@ -2624,7 +2711,7 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str struct win32_mutex *m = (struct win32_mutex *)lock->mutex; b32 exclusive = lock->exclusive; #if RTC - atomic_i64_fetch_add(&cv->num_yielding_waiters, 1); + atomic_i64_fetch_add(&cv->num_yielders, 1); if (exclusive) { m->owner_tid = 0; } @@ -2652,7 +2739,7 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str if (exclusive) { m->owner_tid = (u64)GetCurrentThreadId(); } - atomic_i64_fetch_add(&cv->num_yielding_waiters, -1); + atomic_i64_fetch_add(&cv->num_yielders, -1); #endif } @@ -3172,16 +3259,19 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, __profthread("Main thread", PROF_THREAD_GROUP_MAIN); - /* Init waiters */ - G.waiters_arena = arena_alloc(GIGABYTE(64)); + /* Init yielders */ + G.yielders_arena = arena_alloc(GIGABYTE(64)); - /* Init counters */ - G.counters_arena = arena_alloc(GIGABYTE(64)); + /* Init wait lists */ + G.wait_lists_arena = arena_alloc(GIGABYTE(64)); /* Init fibers */ G.num_fibers = 1; /* Fiber at index 0 always nil */ G.fiber_names_arena = arena_alloc(GIGABYTE(64)); + /* Init counters */ + G.counters_arena = arena_alloc(GIGABYTE(64)); + /* Convert main thread to fiber */ fiber_alloc(FIBER_KIND_CONVERTED_THREAD);