worker sleeping

This commit is contained in:
jacob 2025-07-06 22:47:08 -05:00
parent 2b08223472
commit e720e7e2af
7 changed files with 214 additions and 96 deletions

View File

@ -657,13 +657,14 @@ INLINE f64 clamp_f64(f64 v, f64 min, f64 max) { return v < min ? min : v > max ?
#include "prof_tracy.h"
#define PROF_THREAD_GROUP_WORKERS -MEBI(7)
#define PROF_THREAD_GROUP_FIBERS -MEBI(6)
#define PROF_THREAD_GROUP_IO -MEBI(5)
#define PROF_THREAD_GROUP_WINDOW -MEBI(4)
#define PROF_THREAD_GROUP_EVICTORS -MEBI(3)
#define PROF_THREAD_GROUP_APP -MEBI(2)
#define PROF_THREAD_GROUP_MAIN -MEBI(1)
#define PROF_THREAD_GROUP_WORKERS -MEBI(8)
#define PROF_THREAD_GROUP_FIBERS -MEBI(7)
#define PROF_THREAD_GROUP_SCHEDULER -MEBI(6)
#define PROF_THREAD_GROUP_IO -MEBI(5)
#define PROF_THREAD_GROUP_WINDOW -MEBI(4)
#define PROF_THREAD_GROUP_EVICTORS -MEBI(3)
#define PROF_THREAD_GROUP_APP -MEBI(2)
#define PROF_THREAD_GROUP_MAIN -MEBI(1)
#ifdef __cplusplus
}

View File

@ -11,6 +11,7 @@
#include "mixer.h"
#include "atomic.h"
#include "app.h"
#include "snc.h"
#define COBJMACROS
#define WIN32_LEAN_AND_MEAN
@ -42,6 +43,7 @@ GLOBAL struct {
HANDLE event;
IAudioRenderClient *playback;
WAVEFORMATEX *buffer_format;
struct sys_thread *playback_scheduler_thread;
u32 buffer_frames;
} G = ZI, DEBUG_ALIAS(G, G_playback_wasapi);
@ -51,14 +53,14 @@ GLOBAL struct {
INTERNAL void wasapi_initialize(void);
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(playback_shutdown);
INTERNAL SYS_JOB_DEF(playback_job, _);
INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _);
struct playback_startup_receipt playback_startup(struct mixer_startup_receipt *mixer_sr)
{
(UNUSED)mixer_sr;
wasapi_initialize();
sys_run(1, playback_job, NULL, SYS_PRIORITY_HIGH, NULL);
G.playback_scheduler_thread = sys_thread_alloc(playback_scheduler_entry, NULL, LIT("Playback scheduler thread"), PROF_THREAD_GROUP_SCHEDULER);
app_register_exit_callback(&playback_shutdown);
return (struct playback_startup_receipt) { 0 };
@ -68,6 +70,7 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(playback_shutdown)
{
__prof;
atomic_i32_fetch_set(&G.shutdown, true);
sys_thread_wait_release(G.playback_scheduler_thread);
}
/* ========================== *
@ -172,12 +175,6 @@ INTERNAL struct wasapi_buffer wasapi_update_begin(void)
__prof;
struct wasapi_buffer wspbuf = ZI;
/* Wait */
{
__profn("Wasapi wait");
WaitForSingleObject(G.event, INFINITE);
}
/* Get padding frames */
u32 padding_frames;
IAudioClient_GetCurrentPadding(G.client, &padding_frames);
@ -226,11 +223,22 @@ INTERNAL void wasapi_update_end(struct wasapi_buffer *wspbuf, struct mixed_pcm_f
* Playback thread entry
* ========================== */
INTERNAL SYS_JOB_DEF(playback_job, _)
INTERNAL SYS_JOB_DEF(playback_mix_job, _)
{
__prof;
(UNUSED)_;
struct arena_temp scratch = scratch_begin_no_conflict();
{
struct wasapi_buffer wspbuf = wasapi_update_begin();
struct mixed_pcm_f32 pcm = mixer_update(scratch.arena, wspbuf.frames_count);
wasapi_update_end(&wspbuf, pcm);
}
scratch_end(scratch);
}
INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _)
{
(UNUSED)_;
/* https://learn.microsoft.com/en-us/windows/win32/procthread/multimedia-class-scheduler-service#registry-settings */
DWORD task = 0;
HANDLE mmc_handle = AvSetMmThreadCharacteristicsW(L"Pro Audio", &task);
@ -240,13 +248,15 @@ INTERNAL SYS_JOB_DEF(playback_job, _)
/* FIXME: If playback fails at any point and mixer stops advancing, we
* need to halt mixer to prevent memory leak when sounds are played. */
while (!atomic_i32_fetch(&G.shutdown)) {
struct arena_temp temp = arena_temp_begin(scratch.arena);
struct wasapi_buffer wspbuf = wasapi_update_begin();
struct mixed_pcm_f32 pcm = mixer_update(temp.arena, wspbuf.frames_count);
wasapi_update_end(&wspbuf, pcm);
arena_temp_end(temp);
{
__profn("Wait for audio event");
WaitForSingleObject(G.event, INFINITE);
}
{
__profn("Run mix job & wait");
struct snc_counter counter = ZI;
sys_run(1, playback_mix_job, NULL, SYS_PRIORITY_HIGH, &counter);
snc_counter_wait(&counter);
}
}
scratch_end(scratch);
}

View File

@ -159,8 +159,12 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(resource_shutdown)
__prof;
atomic_i32_fetch_set(&G.watch_shutdown, 1);
snc_cv_broadcast(&G.watch_dispatcher_cv);
sys_watch_wake(G.watch);
{
struct snc_lock lock = snc_lock_e(&G.watch_dispatcher_mutex);
snc_cv_broadcast(&G.watch_dispatcher_cv);
sys_watch_wake(G.watch);
snc_unlock(&lock);
}
sys_thread_wait_release(G.resource_watch_dispatch_thread);
sys_thread_wait_release(G.resource_watch_monitor_thread);
@ -199,8 +203,8 @@ INTERNAL SYS_THREAD_DEF(resource_watch_monitor_thread_entry_point, _)
G.watch_dispatcher_info_list = list_part;
}
}
snc_unlock(&lock);
snc_cv_broadcast(&G.watch_dispatcher_cv);
snc_unlock(&lock);
}
arena_temp_end(temp);
}

103
src/snc.c
View File

@ -2,6 +2,7 @@
#include "atomic.h"
#include "sys.h"
#include "memory.h"
#include "intrinsics.h"
#define DEFAULT_MUTEX_SPIN 4000
@ -11,33 +12,48 @@
struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin)
{
__prof;
b32 locked = false;
i32 spin_cnt = 0;
while (!locked) {
/* Spin lock */
i32 spin_cnt = 0;
i32 v = atomic_i32_fetch_test_set(&m->v, 0, (1 << 31));
do {
if (v == 0) {
locked = true;
} else {
/* Set pending */
if ((v & (1 << 30)) == 0) {
i32 old = atomic_i32_fetch_test_set(&m->v, v | (1 << 30), v);
while (old != v && (old & (1 << 30)) == 0) {
v = old;
old = atomic_i32_fetch_test_set(&m->v, v | (1 << 30), v);
}
v = old;
}
++spin_cnt;
u32 v = atomic_u32_fetch_test_set(&m->v, 0, 0x80000000);
if (v == 0) {
locked = true;
} else if (v == 0x40000000) {
/* Lock has pending bit set, try to lock */
u32 swp = atomic_u32_fetch_test_set(&m->v, v, 0x80000000);
while (swp != v && swp == 0x40000000) {
v = swp;
swp = atomic_u32_fetch_test_set(&m->v, v, 0x80000000);
}
v = swp;
if (v == 0x40000000) {
locked = true;
}
}
if (!locked && (v & 0xC0000000) == 0) {
/* Lock has shared lockers and no pending waiter, set pending bit */
u32 swp = atomic_u32_fetch_test_set(&m->v, v, v | 0x40000000);
while (swp != v && (swp & 0xC0000000) == 0 && swp != 0) {
v = swp;
swp = atomic_u32_fetch_test_set(&m->v, v, v | 0x40000000);
}
v = swp;
}
/* Pause or wait */
if (!locked && v != 0 && v != 0x40000000) {
if (spin_cnt < spin) {
ix_pause();
} else {
sys_wait(&m->v, &v, 4, F32_INFINITY);
}
++spin_cnt;
} while (spin_cnt < spin && !locked);
/* Wait if not successful */
if (!locked) {
sys_wait(&m->v, &v, 4);
}
}
#if RTC
atomic_i32_fetch_set(&m->exclusive_fiber_id, sys_current_fiber_id());
#endif
struct snc_lock lock = ZI;
lock.exclusive = true;
lock.mutex = m;
@ -46,24 +62,27 @@ struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin)
struct snc_lock snc_lock_spin_s(struct snc_mutex *m, i32 spin)
{
__prof;
b32 locked = false;
i32 spin_cnt = 0;
while (!locked) {
/* Spin lock */
i32 spin_cnt = 0;
i32 v = atomic_i32_fetch(&m->v);
do {
while (!locked && (v & 0xC0000000) == 0) {
/* Lock has no exclusive or pending exclusive lock, increment shared count */
i32 old = atomic_i32_fetch_test_set(&m->v, v, v + 1);
if (v == old) {
locked = true;
}
++spin_cnt;
u32 v = atomic_u32_fetch(&m->v);
while (!locked && (v & 0xC0000000) == 0) {
/* Lock has no exclusive or pending exclusive lock, increment shared count */
u32 swp = atomic_u32_fetch_test_set(&m->v, v, v + 1);
if (v == swp) {
locked = true;
} else {
v = swp;
}
} while (spin_cnt < spin && !locked);
/* Wait if not successful */
}
/* Pause or wait */
if (!locked) {
sys_wait(&m->v, &v, 4);
if (spin_cnt < spin) {
ix_pause();
} else {
sys_wait(&m->v, &v, 4, F32_INFINITY);
}
}
}
struct snc_lock lock = ZI;
@ -83,12 +102,14 @@ struct snc_lock snc_lock_s(struct snc_mutex *m)
void snc_unlock(struct snc_lock *l)
{
__prof;
struct snc_mutex *m = l->mutex;
if (l->exclusive) {
atomic_i32_fetch_set(&m->v, 0);
#if RTC
atomic_i32_fetch_set(&m->exclusive_fiber_id, 0);
#endif
atomic_u32_fetch_set(&m->v, 0);
} else {
atomic_i32_fetch_add(&m->v, -1);
atomic_u32_fetch_add_i32(&m->v, -1);
}
sys_wake_all(&m->v);
MEMZERO_STRUCT(l);
@ -107,7 +128,7 @@ void snc_cv_wait(struct snc_cv *cv, struct snc_lock *l)
{
snc_unlock(l);
do {
sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen));
sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), F32_INFINITY);
wake_gen = atomic_u64_fetch(&cv->wake_gen);
} while (wake_gen == old_wake_gen);
sys_wake_all(&cv->wake_gen);
@ -142,7 +163,7 @@ void snc_counter_wait(struct snc_counter *counter)
{
i64 v = atomic_i64_fetch(&counter->v);
while (v > 0) {
sys_wait(&counter->v, &v, sizeof(v));
sys_wait(&counter->v, &v, sizeof(v), F32_INFINITY);
v = atomic_i64_fetch(&counter->v);
}
}

View File

@ -11,7 +11,11 @@ struct snc_lock {
};
struct snc_mutex {
struct atomic_i32 v;
struct atomic_u32 v;
#if RTC
struct atomic_i32 exclusive_fiber_id;
#endif
};
struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin);

View File

@ -449,7 +449,7 @@ b32 sys_run_command(struct string cmd);
/* Futex-like wait & wake */
void sys_wait(void *addr, void *cmp, u32 size);
void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds);
void sys_wake_single(void *addr);
void sys_wake_all(void *addr);

View File

@ -256,7 +256,6 @@ STATIC_ASSERT(alignof(struct fiber_ctx) == 64); /* Avoid false sharing */
struct alignas(64) worker_ctx {
i32 id;
HANDLE sleep_timer;
};
struct job_info {
@ -357,7 +356,10 @@ GLOBAL struct {
struct job_queue job_queues[NUM_JOB_QUEUE_KINDS];
/* Workers */
struct atomic_i32 workers_shutdown; /* TODO: Prevent false sharing */
struct atomic_i64 workers_wake_gen; /* TODO: Prevent false sharing */
struct snc_mutex workers_wake_mutex;
struct snc_cv workers_wake_cv;
i32 num_worker_threads;
struct arena *worker_threads_arena;
struct sys_thread **worker_threads;
@ -390,12 +392,8 @@ INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber);
* Wait / wake
* ========================== */
void sys_wait(void *addr, void *cmp, u32 size)
void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds)
{
__prof;
#if 0
WaitOnAddress(addr, cmp, size, INFINITE);
#else
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i16 parent_fiber_id = fiber->parent_id;
/* Yield if job fiber, otherwise fall back to windows blocking function */
@ -412,15 +410,18 @@ void sys_wait(void *addr, void *cmp, u32 size)
struct fiber *parent_fiber = fiber_from_id(parent_fiber_id);
job_fiber_yield(fiber, parent_fiber);
} else {
WaitOnAddress(addr, cmp, size, INFINITE);
i32 timeout_ms = INFINITE;
if (timeout_seconds != F32_INFINITY) {
timeout_ms = (i32)(timeout_seconds * 1000);
}
WaitOnAddress(addr, cmp, size, timeout_ms);
}
#endif
}
void sys_wake_single(void *addr)
{
ASSERT(false);
(UNUSED)addr;
/* FIXME: Real wake single */
sys_wake_all(addr);
}
void sys_wake_all(void *addr)
@ -428,6 +429,7 @@ void sys_wake_all(void *addr)
u64 wait_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *bin = &G.wait_bins[wait_bin_index];
i32 num_yielders = 0;
while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
{
struct wait_list *wait_list = NULL;
@ -437,6 +439,7 @@ void sys_wake_all(void *addr)
}
}
if (wait_list && wait_list->num_yielders > 0) {
num_yielders = wait_list->num_yielders;
struct arena_temp scratch = scratch_begin_no_conflict();
{
/* Separate yielders by queue kind */
@ -444,7 +447,7 @@ void sys_wake_all(void *addr)
struct yielder **queue_yielder_arrays[NUM_JOB_QUEUE_KINDS] = ZI;
for (i32 i = 0; i < (i32)countof(queue_yielder_arrays); ++i) {
/* NOTE: Each array is conservatively sized as the number of all yielders in the list */
queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, wait_list->num_yielders);
queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, num_yielders);
}
for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) {
enum job_queue_kind queue_kind = yielder->job_queue_kind;
@ -502,6 +505,19 @@ void sys_wake_all(void *addr)
/* Wake blocking waiters */
WakeByAddressAll(addr);
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
if (num_yielders > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
atomic_i64_fetch_add(&G.workers_wake_gen, 1);
snc_cv_broadcast(&G.workers_wake_cv);
}
}
snc_unlock(&lock);
}
}
/* ========================== *
@ -651,6 +667,16 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit
}
atomic_i32_fetch_set(&queue->lock, 0);
}
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
atomic_i64_fetch_add(&G.workers_wake_gen, 1);
snc_cv_broadcast(&G.workers_wake_cv);
}
}
snc_unlock(&lock);
}
/* ========================== *
@ -713,16 +739,17 @@ INTERNAL void job_fiber_entry(void *id_ptr)
}
/* ========================== *
* Worker entry
* Job worker thread
* ========================== */
INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
{
struct worker_ctx *ctx = worker_ctx_arg;
(UNUSED)ctx;
{
/* TODO: Heuristic pinning */
/* TODO: Pin non-worker threads to other cores */
HANDLE thread_handle = GetCurrentThread();
b32 success = false;
(UNUSED)success;
@ -745,8 +772,13 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
struct fiber *job_fiber = NULL;
while (!atomic_i32_fetch(&G.workers_shutdown)) {
struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex);
i64 last_seen_wake_gen = atomic_i64_fetch(&G.workers_wake_gen);
while (last_seen_wake_gen >= 0) {
snc_unlock(&wake_lock);
/* Pull job from queue */
b32 queues_empty = true;
enum sys_priority job_priority = 0;
enum job_queue_kind job_queue_kind = 0;
i32 job_fiber_id = 0;
@ -755,7 +787,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
void *job_sig = 0;
struct snc_counter *job_counter = 0;
{
//__profnc("Pull job", RGB32_F(0.75, 0.75, 0));
__profnc("Pull job", RGB32_F(0.75, 0.75, 0));
for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) {
struct job_queue *queue = queues[queue_index];
if (queue) {
@ -775,21 +807,25 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
if (info->fiber_id > 0) {
}
if (job_id == (info->count - 1)) {
/* We're picking up the last dispatch, so dequeue the job */
dequeue = true;
}
if (!next) {
queues_empty = queue_index >= ((i32)countof(queues) - 1);
}
}
} else {
/* This job is being resumed from a yield */
/* This job is to be resumed from a yield */
job_fiber_id = info->fiber_id;
job_id = info->num_dispatched;
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
dequeue = true;
if (!next) {
queues_empty = queue_index >= ((i32)countof(queues) - 1);
}
}
if (dequeue) {
if (!next) {
@ -937,11 +973,45 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
}
}
}
wake_lock = snc_lock_s(&G.workers_wake_mutex);
if (queues_empty) {
i64 new_wake_gen = atomic_i64_fetch(&G.workers_wake_gen);
while (new_wake_gen == last_seen_wake_gen) {
__profnc("Wait for job", RGB32_F(0.75, 0.75, 0));
snc_cv_wait(&G.workers_wake_cv, &wake_lock);
new_wake_gen = atomic_i64_fetch(&G.workers_wake_gen);
}
last_seen_wake_gen = new_wake_gen;
} else {
last_seen_wake_gen = atomic_i64_fetch(&G.workers_wake_gen);
}
}
snc_unlock(&wake_lock);
}
/* ========================== *
* Job scheduler thread
* ========================== */
INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
{
HANDLE timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
if (!timer) {
sys_panic(LIT("Failed to create high resolution timer"));
}
while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
__profn("Scheduler");
LARGE_INTEGER due = ZI;
due.QuadPart = -1000;
SetWaitableTimerEx(timer, &due, 0, NULL, NULL, NULL, 0);
//SetWaitableTimerEx(timer, &due, 5000, NULL, NULL, NULL, 0);
WaitForSingleObject(timer, INFINITE);
}
}
/* ========================== *
* Test entry
* Test thread
* ========================== */
INTERNAL SYS_THREAD_DEF(test_entry, _)
@ -951,23 +1021,26 @@ INTERNAL SYS_THREAD_DEF(test_entry, _)
/* Start workers */
G.num_worker_threads = 6;
//G.num_worker_threads = 2;
G.worker_threads_arena = arena_alloc(GIBI(64));
G.worker_threads = arena_push_array(G.worker_threads_arena, struct sys_thread *, G.num_worker_threads);
G.worker_contexts = arena_push_array(G.worker_threads_arena, struct worker_ctx, G.num_worker_threads);
for (i32 i = 0; i < G.num_worker_threads; ++i) {
struct worker_ctx *ctx = &G.worker_contexts[i];
ctx->id = i;
ctx->sleep_timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
struct string name = string_format(scratch.arena, LIT("Worker #%F"), FMT_SINT(i));
G.worker_threads[i] = sys_thread_alloc(worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i);
G.worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i);
}
/* Start scheduler */
struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, NULL, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER);
/* Wait on workers */
for (i32 i = 0; i < G.num_worker_threads; ++i) {
struct sys_thread *worker_thread = G.worker_threads[i];
sys_thread_wait_release(worker_thread);
}
/* Wait on scheduler */
sys_thread_wait_release(scheduler_thread);
scratch_end(scratch);
}
@ -3045,12 +3118,12 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
/* Get app thread handle */
HANDLE app_thread_handle = 0;
struct snc_lock lock = snc_lock_s(&G.threads_mutex);
{
struct snc_lock lock = snc_lock_s(&G.threads_mutex);
struct win32_thread *wt = (struct win32_thread *)app_thread;
app_thread_handle = wt->handle;
snc_unlock(&lock);
}
snc_unlock(&lock);
/* Wait for either app thread exit or panic */
@ -3068,7 +3141,12 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance,
}
/* Shutdown test thread */
atomic_i32_fetch_set(&G.workers_shutdown, 1);
{
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
atomic_i64_fetch_set(&G.workers_wake_gen, -1);
snc_cv_broadcast(&G.workers_wake_cv);
snc_unlock(&lock);
}
sys_thread_wait_release(test_thread);
}