diff --git a/src/common.h b/src/common.h index e9f1ea20..7744ae25 100644 --- a/src/common.h +++ b/src/common.h @@ -657,13 +657,14 @@ INLINE f64 clamp_f64(f64 v, f64 min, f64 max) { return v < min ? min : v > max ? #include "prof_tracy.h" -#define PROF_THREAD_GROUP_WORKERS -MEBI(7) -#define PROF_THREAD_GROUP_FIBERS -MEBI(6) -#define PROF_THREAD_GROUP_IO -MEBI(5) -#define PROF_THREAD_GROUP_WINDOW -MEBI(4) -#define PROF_THREAD_GROUP_EVICTORS -MEBI(3) -#define PROF_THREAD_GROUP_APP -MEBI(2) -#define PROF_THREAD_GROUP_MAIN -MEBI(1) +#define PROF_THREAD_GROUP_WORKERS -MEBI(8) +#define PROF_THREAD_GROUP_FIBERS -MEBI(7) +#define PROF_THREAD_GROUP_SCHEDULER -MEBI(6) +#define PROF_THREAD_GROUP_IO -MEBI(5) +#define PROF_THREAD_GROUP_WINDOW -MEBI(4) +#define PROF_THREAD_GROUP_EVICTORS -MEBI(3) +#define PROF_THREAD_GROUP_APP -MEBI(2) +#define PROF_THREAD_GROUP_MAIN -MEBI(1) #ifdef __cplusplus } diff --git a/src/playback_wasapi.c b/src/playback_wasapi.c index 1796329b..863eb7f0 100644 --- a/src/playback_wasapi.c +++ b/src/playback_wasapi.c @@ -11,6 +11,7 @@ #include "mixer.h" #include "atomic.h" #include "app.h" +#include "snc.h" #define COBJMACROS #define WIN32_LEAN_AND_MEAN @@ -42,6 +43,7 @@ GLOBAL struct { HANDLE event; IAudioRenderClient *playback; WAVEFORMATEX *buffer_format; + struct sys_thread *playback_scheduler_thread; u32 buffer_frames; } G = ZI, DEBUG_ALIAS(G, G_playback_wasapi); @@ -51,14 +53,14 @@ GLOBAL struct { INTERNAL void wasapi_initialize(void); INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(playback_shutdown); -INTERNAL SYS_JOB_DEF(playback_job, _); +INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _); struct playback_startup_receipt playback_startup(struct mixer_startup_receipt *mixer_sr) { (UNUSED)mixer_sr; wasapi_initialize(); - sys_run(1, playback_job, NULL, SYS_PRIORITY_HIGH, NULL); + G.playback_scheduler_thread = sys_thread_alloc(playback_scheduler_entry, NULL, LIT("Playback scheduler thread"), PROF_THREAD_GROUP_SCHEDULER); app_register_exit_callback(&playback_shutdown); return (struct playback_startup_receipt) { 0 }; @@ -68,6 +70,7 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(playback_shutdown) { __prof; atomic_i32_fetch_set(&G.shutdown, true); + sys_thread_wait_release(G.playback_scheduler_thread); } /* ========================== * @@ -172,12 +175,6 @@ INTERNAL struct wasapi_buffer wasapi_update_begin(void) __prof; struct wasapi_buffer wspbuf = ZI; - /* Wait */ - { - __profn("Wasapi wait"); - WaitForSingleObject(G.event, INFINITE); - } - /* Get padding frames */ u32 padding_frames; IAudioClient_GetCurrentPadding(G.client, &padding_frames); @@ -226,11 +223,22 @@ INTERNAL void wasapi_update_end(struct wasapi_buffer *wspbuf, struct mixed_pcm_f * Playback thread entry * ========================== */ -INTERNAL SYS_JOB_DEF(playback_job, _) +INTERNAL SYS_JOB_DEF(playback_mix_job, _) { + __prof; (UNUSED)_; struct arena_temp scratch = scratch_begin_no_conflict(); + { + struct wasapi_buffer wspbuf = wasapi_update_begin(); + struct mixed_pcm_f32 pcm = mixer_update(scratch.arena, wspbuf.frames_count); + wasapi_update_end(&wspbuf, pcm); + } + scratch_end(scratch); +} +INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _) +{ + (UNUSED)_; /* https://learn.microsoft.com/en-us/windows/win32/procthread/multimedia-class-scheduler-service#registry-settings */ DWORD task = 0; HANDLE mmc_handle = AvSetMmThreadCharacteristicsW(L"Pro Audio", &task); @@ -240,13 +248,15 @@ INTERNAL SYS_JOB_DEF(playback_job, _) /* FIXME: If playback fails at any point and mixer stops advancing, we * need to halt mixer to prevent memory leak when sounds are played. */ while (!atomic_i32_fetch(&G.shutdown)) { - struct arena_temp temp = arena_temp_begin(scratch.arena); - struct wasapi_buffer wspbuf = wasapi_update_begin(); - struct mixed_pcm_f32 pcm = mixer_update(temp.arena, wspbuf.frames_count); - wasapi_update_end(&wspbuf, pcm); - arena_temp_end(temp); - + { + __profn("Wait for audio event"); + WaitForSingleObject(G.event, INFINITE); + } + { + __profn("Run mix job & wait"); + struct snc_counter counter = ZI; + sys_run(1, playback_mix_job, NULL, SYS_PRIORITY_HIGH, &counter); + snc_counter_wait(&counter); + } } - - scratch_end(scratch); } diff --git a/src/resource.c b/src/resource.c index bcef2b00..d47f25f0 100644 --- a/src/resource.c +++ b/src/resource.c @@ -159,8 +159,12 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(resource_shutdown) __prof; atomic_i32_fetch_set(&G.watch_shutdown, 1); - snc_cv_broadcast(&G.watch_dispatcher_cv); - sys_watch_wake(G.watch); + { + struct snc_lock lock = snc_lock_e(&G.watch_dispatcher_mutex); + snc_cv_broadcast(&G.watch_dispatcher_cv); + sys_watch_wake(G.watch); + snc_unlock(&lock); + } sys_thread_wait_release(G.resource_watch_dispatch_thread); sys_thread_wait_release(G.resource_watch_monitor_thread); @@ -199,8 +203,8 @@ INTERNAL SYS_THREAD_DEF(resource_watch_monitor_thread_entry_point, _) G.watch_dispatcher_info_list = list_part; } } - snc_unlock(&lock); snc_cv_broadcast(&G.watch_dispatcher_cv); + snc_unlock(&lock); } arena_temp_end(temp); } diff --git a/src/snc.c b/src/snc.c index c67f7428..1513559f 100644 --- a/src/snc.c +++ b/src/snc.c @@ -2,6 +2,7 @@ #include "atomic.h" #include "sys.h" #include "memory.h" +#include "intrinsics.h" #define DEFAULT_MUTEX_SPIN 4000 @@ -11,33 +12,48 @@ struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin) { - __prof; b32 locked = false; + i32 spin_cnt = 0; while (!locked) { - /* Spin lock */ - i32 spin_cnt = 0; - i32 v = atomic_i32_fetch_test_set(&m->v, 0, (1 << 31)); - do { - if (v == 0) { - locked = true; - } else { - /* Set pending */ - if ((v & (1 << 30)) == 0) { - i32 old = atomic_i32_fetch_test_set(&m->v, v | (1 << 30), v); - while (old != v && (old & (1 << 30)) == 0) { - v = old; - old = atomic_i32_fetch_test_set(&m->v, v | (1 << 30), v); - } - v = old; - } + ++spin_cnt; + u32 v = atomic_u32_fetch_test_set(&m->v, 0, 0x80000000); + if (v == 0) { + locked = true; + } else if (v == 0x40000000) { + /* Lock has pending bit set, try to lock */ + u32 swp = atomic_u32_fetch_test_set(&m->v, v, 0x80000000); + while (swp != v && swp == 0x40000000) { + v = swp; + swp = atomic_u32_fetch_test_set(&m->v, v, 0x80000000); + } + v = swp; + if (v == 0x40000000) { + locked = true; + } + } + if (!locked && (v & 0xC0000000) == 0) { + /* Lock has shared lockers and no pending waiter, set pending bit */ + u32 swp = atomic_u32_fetch_test_set(&m->v, v, v | 0x40000000); + while (swp != v && (swp & 0xC0000000) == 0 && swp != 0) { + v = swp; + swp = atomic_u32_fetch_test_set(&m->v, v, v | 0x40000000); + } + v = swp; + } + /* Pause or wait */ + if (!locked && v != 0 && v != 0x40000000) { + if (spin_cnt < spin) { + ix_pause(); + } else { + sys_wait(&m->v, &v, 4, F32_INFINITY); } - ++spin_cnt; - } while (spin_cnt < spin && !locked); - /* Wait if not successful */ - if (!locked) { - sys_wait(&m->v, &v, 4); } } + +#if RTC + atomic_i32_fetch_set(&m->exclusive_fiber_id, sys_current_fiber_id()); +#endif + struct snc_lock lock = ZI; lock.exclusive = true; lock.mutex = m; @@ -46,24 +62,27 @@ struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin) struct snc_lock snc_lock_spin_s(struct snc_mutex *m, i32 spin) { - __prof; b32 locked = false; + i32 spin_cnt = 0; while (!locked) { - /* Spin lock */ - i32 spin_cnt = 0; - i32 v = atomic_i32_fetch(&m->v); - do { - while (!locked && (v & 0xC0000000) == 0) { - /* Lock has no exclusive or pending exclusive lock, increment shared count */ - i32 old = atomic_i32_fetch_test_set(&m->v, v, v + 1); - if (v == old) { - locked = true; - } + ++spin_cnt; + u32 v = atomic_u32_fetch(&m->v); + while (!locked && (v & 0xC0000000) == 0) { + /* Lock has no exclusive or pending exclusive lock, increment shared count */ + u32 swp = atomic_u32_fetch_test_set(&m->v, v, v + 1); + if (v == swp) { + locked = true; + } else { + v = swp; } - } while (spin_cnt < spin && !locked); - /* Wait if not successful */ + } + /* Pause or wait */ if (!locked) { - sys_wait(&m->v, &v, 4); + if (spin_cnt < spin) { + ix_pause(); + } else { + sys_wait(&m->v, &v, 4, F32_INFINITY); + } } } struct snc_lock lock = ZI; @@ -83,12 +102,14 @@ struct snc_lock snc_lock_s(struct snc_mutex *m) void snc_unlock(struct snc_lock *l) { - __prof; struct snc_mutex *m = l->mutex; if (l->exclusive) { - atomic_i32_fetch_set(&m->v, 0); +#if RTC + atomic_i32_fetch_set(&m->exclusive_fiber_id, 0); +#endif + atomic_u32_fetch_set(&m->v, 0); } else { - atomic_i32_fetch_add(&m->v, -1); + atomic_u32_fetch_add_i32(&m->v, -1); } sys_wake_all(&m->v); MEMZERO_STRUCT(l); @@ -107,7 +128,7 @@ void snc_cv_wait(struct snc_cv *cv, struct snc_lock *l) { snc_unlock(l); do { - sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen)); + sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), F32_INFINITY); wake_gen = atomic_u64_fetch(&cv->wake_gen); } while (wake_gen == old_wake_gen); sys_wake_all(&cv->wake_gen); @@ -142,7 +163,7 @@ void snc_counter_wait(struct snc_counter *counter) { i64 v = atomic_i64_fetch(&counter->v); while (v > 0) { - sys_wait(&counter->v, &v, sizeof(v)); + sys_wait(&counter->v, &v, sizeof(v), F32_INFINITY); v = atomic_i64_fetch(&counter->v); } } diff --git a/src/snc.h b/src/snc.h index b160b6c7..fa737d6b 100644 --- a/src/snc.h +++ b/src/snc.h @@ -11,7 +11,11 @@ struct snc_lock { }; struct snc_mutex { - struct atomic_i32 v; + struct atomic_u32 v; + +#if RTC + struct atomic_i32 exclusive_fiber_id; +#endif }; struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin); diff --git a/src/sys.h b/src/sys.h index 1efecaf4..6757daf3 100644 --- a/src/sys.h +++ b/src/sys.h @@ -449,7 +449,7 @@ b32 sys_run_command(struct string cmd); /* Futex-like wait & wake */ -void sys_wait(void *addr, void *cmp, u32 size); +void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds); void sys_wake_single(void *addr); void sys_wake_all(void *addr); diff --git a/src/sys_win32.c b/src/sys_win32.c index 6fbd848d..6af23f40 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -256,7 +256,6 @@ STATIC_ASSERT(alignof(struct fiber_ctx) == 64); /* Avoid false sharing */ struct alignas(64) worker_ctx { i32 id; - HANDLE sleep_timer; }; struct job_info { @@ -357,7 +356,10 @@ GLOBAL struct { struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; /* Workers */ - struct atomic_i32 workers_shutdown; /* TODO: Prevent false sharing */ + struct atomic_i64 workers_wake_gen; /* TODO: Prevent false sharing */ + struct snc_mutex workers_wake_mutex; + struct snc_cv workers_wake_cv; + i32 num_worker_threads; struct arena *worker_threads_arena; struct sys_thread **worker_threads; @@ -390,12 +392,8 @@ INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber); * Wait / wake * ========================== */ -void sys_wait(void *addr, void *cmp, u32 size) +void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds) { - __prof; -#if 0 - WaitOnAddress(addr, cmp, size, INFINITE); -#else struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); i16 parent_fiber_id = fiber->parent_id; /* Yield if job fiber, otherwise fall back to windows blocking function */ @@ -412,15 +410,18 @@ void sys_wait(void *addr, void *cmp, u32 size) struct fiber *parent_fiber = fiber_from_id(parent_fiber_id); job_fiber_yield(fiber, parent_fiber); } else { - WaitOnAddress(addr, cmp, size, INFINITE); + i32 timeout_ms = INFINITE; + if (timeout_seconds != F32_INFINITY) { + timeout_ms = (i32)(timeout_seconds * 1000); + } + WaitOnAddress(addr, cmp, size, timeout_ms); } -#endif } void sys_wake_single(void *addr) { - ASSERT(false); - (UNUSED)addr; + /* FIXME: Real wake single */ + sys_wake_all(addr); } void sys_wake_all(void *addr) @@ -428,6 +429,7 @@ void sys_wake_all(void *addr) u64 wait_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS; struct wait_bin *bin = &G.wait_bins[wait_bin_index]; + i32 num_yielders = 0; while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); { struct wait_list *wait_list = NULL; @@ -437,6 +439,7 @@ void sys_wake_all(void *addr) } } if (wait_list && wait_list->num_yielders > 0) { + num_yielders = wait_list->num_yielders; struct arena_temp scratch = scratch_begin_no_conflict(); { /* Separate yielders by queue kind */ @@ -444,7 +447,7 @@ void sys_wake_all(void *addr) struct yielder **queue_yielder_arrays[NUM_JOB_QUEUE_KINDS] = ZI; for (i32 i = 0; i < (i32)countof(queue_yielder_arrays); ++i) { /* NOTE: Each array is conservatively sized as the number of all yielders in the list */ - queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, wait_list->num_yielders); + queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, num_yielders); } for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) { enum job_queue_kind queue_kind = yielder->job_queue_kind; @@ -502,6 +505,19 @@ void sys_wake_all(void *addr) /* Wake blocking waiters */ WakeByAddressAll(addr); + + /* Wake workers */ + /* TODO: Only wake necessary amount of workers */ + if (num_yielders > 0) { + struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); + { + if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { + atomic_i64_fetch_add(&G.workers_wake_gen, 1); + snc_cv_broadcast(&G.workers_wake_cv); + } + } + snc_unlock(&lock); + } } /* ========================== * @@ -651,6 +667,16 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit } atomic_i32_fetch_set(&queue->lock, 0); } + /* Wake workers */ + /* TODO: Only wake necessary amount of workers */ + struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); + { + if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { + atomic_i64_fetch_add(&G.workers_wake_gen, 1); + snc_cv_broadcast(&G.workers_wake_cv); + } + } + snc_unlock(&lock); } /* ========================== * @@ -713,16 +739,17 @@ INTERNAL void job_fiber_entry(void *id_ptr) } /* ========================== * - * Worker entry + * Job worker thread * ========================== */ -INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) +INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) { struct worker_ctx *ctx = worker_ctx_arg; (UNUSED)ctx; { /* TODO: Heuristic pinning */ + /* TODO: Pin non-worker threads to other cores */ HANDLE thread_handle = GetCurrentThread(); b32 success = false; (UNUSED)success; @@ -745,8 +772,13 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) struct fiber *job_fiber = NULL; - while (!atomic_i32_fetch(&G.workers_shutdown)) { + struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex); + i64 last_seen_wake_gen = atomic_i64_fetch(&G.workers_wake_gen); + while (last_seen_wake_gen >= 0) { + snc_unlock(&wake_lock); + /* Pull job from queue */ + b32 queues_empty = true; enum sys_priority job_priority = 0; enum job_queue_kind job_queue_kind = 0; i32 job_fiber_id = 0; @@ -755,7 +787,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) void *job_sig = 0; struct snc_counter *job_counter = 0; { - //__profnc("Pull job", RGB32_F(0.75, 0.75, 0)); + __profnc("Pull job", RGB32_F(0.75, 0.75, 0)); for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) { struct job_queue *queue = queues[queue_index]; if (queue) { @@ -775,21 +807,25 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) job_func = info->func; job_sig = info->sig; job_counter = info->counter; - if (info->fiber_id > 0) { - } if (job_id == (info->count - 1)) { /* We're picking up the last dispatch, so dequeue the job */ dequeue = true; } + if (!next) { + queues_empty = queue_index >= ((i32)countof(queues) - 1); + } } } else { - /* This job is being resumed from a yield */ + /* This job is to be resumed from a yield */ job_fiber_id = info->fiber_id; job_id = info->num_dispatched; job_func = info->func; job_sig = info->sig; job_counter = info->counter; dequeue = true; + if (!next) { + queues_empty = queue_index >= ((i32)countof(queues) - 1); + } } if (dequeue) { if (!next) { @@ -937,11 +973,45 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) } } } + + wake_lock = snc_lock_s(&G.workers_wake_mutex); + if (queues_empty) { + i64 new_wake_gen = atomic_i64_fetch(&G.workers_wake_gen); + while (new_wake_gen == last_seen_wake_gen) { + __profnc("Wait for job", RGB32_F(0.75, 0.75, 0)); + snc_cv_wait(&G.workers_wake_cv, &wake_lock); + new_wake_gen = atomic_i64_fetch(&G.workers_wake_gen); + } + last_seen_wake_gen = new_wake_gen; + } else { + last_seen_wake_gen = atomic_i64_fetch(&G.workers_wake_gen); + } + } + snc_unlock(&wake_lock); +} + +/* ========================== * + * Job scheduler thread + * ========================== */ + +INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) +{ + HANDLE timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); + if (!timer) { + sys_panic(LIT("Failed to create high resolution timer")); + } + while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { + __profn("Scheduler"); + LARGE_INTEGER due = ZI; + due.QuadPart = -1000; + SetWaitableTimerEx(timer, &due, 0, NULL, NULL, NULL, 0); + //SetWaitableTimerEx(timer, &due, 5000, NULL, NULL, NULL, 0); + WaitForSingleObject(timer, INFINITE); } } /* ========================== * - * Test entry + * Test thread * ========================== */ INTERNAL SYS_THREAD_DEF(test_entry, _) @@ -951,23 +1021,26 @@ INTERNAL SYS_THREAD_DEF(test_entry, _) /* Start workers */ G.num_worker_threads = 6; - //G.num_worker_threads = 2; G.worker_threads_arena = arena_alloc(GIBI(64)); G.worker_threads = arena_push_array(G.worker_threads_arena, struct sys_thread *, G.num_worker_threads); G.worker_contexts = arena_push_array(G.worker_threads_arena, struct worker_ctx, G.num_worker_threads); for (i32 i = 0; i < G.num_worker_threads; ++i) { struct worker_ctx *ctx = &G.worker_contexts[i]; ctx->id = i; - ctx->sleep_timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); struct string name = string_format(scratch.arena, LIT("Worker #%F"), FMT_SINT(i)); - G.worker_threads[i] = sys_thread_alloc(worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i); + G.worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i); } + /* Start scheduler */ + struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, NULL, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER); + /* Wait on workers */ for (i32 i = 0; i < G.num_worker_threads; ++i) { struct sys_thread *worker_thread = G.worker_threads[i]; sys_thread_wait_release(worker_thread); } + /* Wait on scheduler */ + sys_thread_wait_release(scheduler_thread); scratch_end(scratch); } @@ -3045,12 +3118,12 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, /* Get app thread handle */ HANDLE app_thread_handle = 0; - struct snc_lock lock = snc_lock_s(&G.threads_mutex); { + struct snc_lock lock = snc_lock_s(&G.threads_mutex); struct win32_thread *wt = (struct win32_thread *)app_thread; app_thread_handle = wt->handle; + snc_unlock(&lock); } - snc_unlock(&lock); /* Wait for either app thread exit or panic */ @@ -3068,7 +3141,12 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, } /* Shutdown test thread */ - atomic_i32_fetch_set(&G.workers_shutdown, 1); + { + struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); + atomic_i64_fetch_set(&G.workers_wake_gen, -1); + snc_cv_broadcast(&G.workers_wake_cv); + snc_unlock(&lock); + } sys_thread_wait_release(test_thread); }