From 40dcdb40dc269b4a7886f31ebdd05c7b6b36f18c Mon Sep 17 00:00:00 2001 From: jacob Date: Sat, 5 Jul 2025 22:40:39 -0500 Subject: [PATCH] fiber wait wip --- src/sprite.c | 7 ++ src/sys.h | 2 - src/sys_win32.c | 263 +++++++++++++++++------------------------------- 3 files changed, 99 insertions(+), 173 deletions(-) diff --git a/src/sprite.c b/src/sprite.c index 2c8951bf..25589a6e 100644 --- a/src/sprite.c +++ b/src/sprite.c @@ -1382,7 +1382,14 @@ INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg) while (!G.evictor_scheduler_shutdown) { sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, job_counter); sys_counter_wait(job_counter); + /* FIXME: Enable this */ +#if 0 sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS)); +#else + sys_mutex_unlock(&evictor_lock); + sys_sleep(SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS)); + evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex); +#endif } sys_mutex_unlock(&evictor_lock); } diff --git a/src/sys.h b/src/sys.h index 33c3d33c..07296b3e 100644 --- a/src/sys.h +++ b/src/sys.h @@ -388,8 +388,6 @@ void sys_assert_locked_e_or_s(struct sys_lock *lock, struct sys_mutex *mutex); struct sys_condition_variable *sys_condition_variable_alloc(void); void sys_condition_variable_release(struct sys_condition_variable *sys_cv); void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct sys_lock *lock); -void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, struct sys_lock *lock, f64 seconds); -void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 count); void sys_condition_variable_broadcast(struct sys_condition_variable *sys_cv); /* ========================== * diff --git a/src/sys_win32.c b/src/sys_win32.c index de905195..5d5a94a3 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -35,25 +35,13 @@ #define FIBER_STACK_SIZE MEGABYTE(4) struct win32_mutex { - SRWLOCK srwlock; + volatile LONG state; /* Lower 30 bits = reader count, bit 30 = writer waiting, bit 31 = writer held */ struct win32_mutex *next_free; - -#if PROFILING_LOCKS - __proflock_ctx(profiling_ctx); -#endif - -#if RTC - u64 owner_tid; - struct atomic_i64 count; -#endif }; struct win32_condition_variable { - CONDITION_VARIABLE condition_variable; + struct atomic_u64 wake_gen; struct win32_condition_variable *next_free; -#if RTC - struct atomic_i64 num_yielders; -#endif }; struct win32_thread { @@ -393,7 +381,7 @@ GLOBAL struct { INTERNAL struct fiber *fiber_from_id(i32 id); -INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param); +INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber); @@ -416,7 +404,8 @@ INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struc void sys_wait(void *addr, void *cmp, u32 size) { -#if 1 + __prof; +#if 0 WaitOnAddress(addr, cmp, size, INFINITE); #else struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); @@ -425,7 +414,7 @@ void sys_wait(void *addr, void *cmp, u32 size) if (parent_fiber_id > 0) { #if 1 /* Yield if job fiber */ - struct yield_param param = { + *fiber->yield_param = (struct yield_param) { .kind = YIELD_KIND_WAIT, .wait = { .addr = addr, @@ -433,7 +422,8 @@ void sys_wait(void *addr, void *cmp, u32 size) .size = size } }; - fiber_yield(fiber, fiber_from_id(parent_fiber_id), ¶m); + struct fiber *parent_fiber = fiber_from_id(parent_fiber_id); + fiber_yield(fiber, parent_fiber); #else while (MEMEQ(addr, cmp, size)) { ix_pause(); @@ -447,7 +437,7 @@ void sys_wait(void *addr, void *cmp, u32 size) void sys_wake_all(void *addr) { -#if 0 +#if 1 u64 wait_bin_index = (u64)addr % NUM_WAIT_BINS; struct wait_bin *bin = &G.wait_bins[wait_bin_index]; @@ -472,7 +462,7 @@ void sys_wake_all(void *addr) for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) { enum job_queue_kind queue_kind = yielder->job_queue_kind; i32 index = queue_counts[queue_kind]++; - struct yielder **array = queue_yielder_arrays[index]; + struct yielder **array = queue_yielder_arrays[queue_kind]; array[index] = yielder; } /* Push jobs */ @@ -511,11 +501,17 @@ void sys_wake_all(void *addr) atomic_i32_fetch_set(&queue->lock, 0); } } + /* Free yielders */ + wait_list->last_yielder->next = wait_list->first_free_yielder; + wait_list->first_free_yielder = wait_list->first_yielder; + wait_list->first_yielder = NULL; + wait_list->last_yielder = NULL; + wait_list->num_yielders = 0; } scratch_end(scratch); } } - atomic_i32_fetch_set(&bin->lock, 1); + atomic_i32_fetch_set(&bin->lock, 0); #endif /* Wake blocking waiters */ @@ -692,7 +688,7 @@ i32 sys_current_fiber_id(void) return (i32)(i64)GetFiberData(); } -INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param) +INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber) { ASSERT(fiber->id == sys_current_fiber_id()); ASSERT(parent_fiber->id == fiber->parent_id); @@ -701,7 +697,6 @@ INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struc } { __prof_fiber_leave; - fiber->yield_param = param; SwitchToFiber(parent_fiber->addr); __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id); } @@ -714,8 +709,8 @@ void sys_yield(void) i32 parent_id = fiber->parent_id; if (parent_id > 0) { /* Top level fibers should not yield */ struct fiber *parent_fiber = fiber_from_id(parent_id); - struct yield_param param = { .kind = YIELD_KIND_COOPERATIVE }; - fiber_yield(fiber, parent_fiber, ¶m); + fiber->yield_param->kind = YIELD_KIND_COOPERATIVE; + fiber_yield(fiber, parent_fiber); } } @@ -764,26 +759,24 @@ INTERNAL void job_fiber_entry(void *id_ptr) { i32 id = (i32)(i64)id_ptr; struct fiber *fiber = fiber_from_id(id); - struct yield_param yield = ZI; while (true) { __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id); - i32 parent_id = fiber->parent_id; - struct fiber *parent_fiber = fiber_from_id(parent_id); - void *parent_fiber_addr = parent_fiber->addr; /* Run job */ { __profscope(Run job); - yield.kind = YIELD_KIND_NONE; - fiber->yield_param = &yield; + fiber->yield_param->kind = YIELD_KIND_NONE; struct sys_job_data data = ZI; data.id = fiber->job_id; data.sig = fiber->job_sig; fiber->job_func(data); - yield.kind = YIELD_KIND_DONE; - fiber->yield_param = &yield; + fiber->yield_param->kind = YIELD_KIND_DONE; } __prof_fiber_leave; - SwitchToFiber(parent_fiber_addr); + { + i32 parent_id = fiber->parent_id; + struct fiber *parent_fiber = fiber_from_id(parent_id); + SwitchToFiber(parent_fiber->addr); + } } } @@ -793,6 +786,7 @@ INTERNAL void job_fiber_entry(void *id_ptr) INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) { + __prof; struct worker_ctx *ctx = worker_ctx_arg; (UNUSED)ctx; @@ -881,36 +875,38 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) if (!job_fiber) { job_fiber = fiber_alloc(FIBER_KIND_JOB_WORKER); } + struct yield_param yield = ZI; job_fiber->job_func = job_func; job_fiber->job_sig = job_sig; job_fiber->job_id = job_id; job_fiber->job_priority = job_priority; job_fiber->parent_id = worker_fiber_id; + job_fiber->yield_param = &yield; b32 done = false; while (!done) { SwitchToFiber(job_fiber->addr); - struct yield_param *yield_param = job_fiber->yield_param; - enum yield_kind yield_kind = yield_param ? yield_param->kind : YIELD_KIND_NONE; - switch (yield_kind) { + switch (yield.kind) { default: { /* Invalid yield kind */ - sys_panic(LIT("Invalid fiber yield")); + struct arena_temp scratch = scratch_begin_no_conflict(); + sys_panic(string_format(scratch.arena, LIT("Invalid fiber yield kind \"%F\""), FMT_SINT(yield.kind))); + scratch_end(scratch); } break; case YIELD_KIND_WAIT: { #if 1 - void *wait_addr = yield_param->wait.addr; - void *wait_cmp = yield_param->wait.cmp; - u32 wait_size = yield_param->wait.size; + void *wait_addr = yield.wait.addr; + void *wait_cmp = yield.wait.cmp; + u32 wait_size = yield.wait.size; u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_BINS; struct wait_bin *bin = &G.wait_bins[wait_bin_index]; while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); { - if (!MEMEQ(wait_addr, wait_cmp, wait_size)) { + if (MEMEQ(wait_addr, wait_cmp, wait_size)) { /* Search addr wait list in bin */ struct wait_list *wait_list = NULL; for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) { @@ -978,7 +974,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) atomic_i32_fetch_set(&bin->lock, 0); #else (UNUSED)job_queue_kind; - ASSERT(false); + //ASSERT(false); #endif } break; @@ -993,13 +989,6 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) } } } - - /* TODO */ -#if 0 - { - __profscope(Worker wait); - } -#endif } INTERNAL SYS_THREAD_DEF(test_entry, _) @@ -2515,17 +2504,7 @@ void sys_window_cursor_disable_clip(struct sys_window *sys_window) INTERNAL void win32_mutex_init(struct win32_mutex *m) { -#if PROFILING_LOCKS - __proflock_ctx(profiling_ctx) = m->profiling_ctx; -#endif MEMZERO_STRUCT(m); - m->srwlock = (SRWLOCK)SRWLOCK_INIT; -#if PROFILING_LOCKS - if (!profiling_ctx) { - __proflock_alloc(profiling_ctx); - } - m->profiling_ctx = profiling_ctx; -#endif } struct sys_mutex *sys_mutex_alloc(void) @@ -2550,7 +2529,6 @@ void sys_mutex_release(struct sys_mutex *mutex) { __prof; struct win32_mutex *m = (struct win32_mutex *)mutex; - ASSERT(atomic_i64_fetch(&m->count) == 0); /* Mutex should be unlocked */ { struct sys_lock lock = sys_mutex_lock_e(G.mutexes_mutex); m->next_free = G.first_free_mutex; @@ -2562,13 +2540,22 @@ void sys_mutex_release(struct sys_mutex *mutex) struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex) { struct win32_mutex *m = (struct win32_mutex *)mutex; - __proflock_before_exclusive_lock(m->profiling_ctx); - AcquireSRWLockExclusive((SRWLOCK *)&m->srwlock); - __proflock_after_exclusive_lock(m->profiling_ctx); -#if RTC - m->owner_tid = (u64)GetCurrentThreadId(); - atomic_i64_fetch_add(&m->count, 1); -#endif + { + while (true) { + //LONG expected = 0; + if (InterlockedCompareExchange(&m->state, 0x80000000, 0) == 0) { + break; /* Acquired exclusive */ + } else { + /* Set writer pending */ + LONG old; + do { + old = m->state; + if (old & 0x40000000) break; /* Already pending */ + } while (InterlockedCompareExchange(&m->state, old | 0x40000000, old) != old); + sys_wait((void *)&m->state, (void *)&old, sizeof(old)); + } + } + } struct sys_lock lock = ZI; lock.exclusive = true; lock.mutex = mutex; @@ -2578,12 +2565,19 @@ struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex) struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex) { struct win32_mutex *m = (struct win32_mutex *)mutex; - __proflock_before_shared_lock(m->profiling_ctx); - AcquireSRWLockShared((SRWLOCK *)&m->srwlock); - __proflock_after_shared_lock(m->profiling_ctx); -#if RTC - atomic_i64_fetch_add(&m->count, 1); -#endif + { + while (true) { + LONG old = m->state; + /* If writer not held or pending */ + if ((old & 0xC0000000) == 0) { + if (InterlockedCompareExchange(&m->state, old + 1, old) == old) { + break; /* Acquired shared */ + } + } else { + sys_wait((void *)&m->state, &old, sizeof(old)); + } + } + } struct sys_lock lock = ZI; lock.mutex = mutex; return lock; @@ -2592,16 +2586,15 @@ struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex) void sys_mutex_unlock(struct sys_lock *lock) { struct win32_mutex *m = (struct win32_mutex *)lock->mutex; -#if RTC - atomic_i64_fetch_add(&m->count, -1); - m->owner_tid = 0; -#endif if (lock->exclusive) { - ReleaseSRWLockExclusive((SRWLOCK *)&m->srwlock); - __proflock_after_exclusive_unlock(m->profiling_ctx); + //LONG old = m->state; + InterlockedExchange(&m->state, 0); /* Clear all bits */ + sys_wake_all((void *)&m->state); } else { - ReleaseSRWLockShared((SRWLOCK *)&m->srwlock); - __proflock_after_shared_unlock(m->profiling_ctx); + LONG old = InterlockedDecrement(&m->state); + if (old == 1) { + sys_wake_all((void *)&m->state); + } } MEMZERO_STRUCT(lock); } @@ -2633,14 +2626,11 @@ INTERNAL struct win32_condition_variable *win32_condition_variable_alloc(void) cv = G.first_free_condition_variable; G.first_free_condition_variable = cv->next_free; } else { - cv = arena_push(G.condition_variables_arena, struct win32_condition_variable); + cv = arena_push_no_zero(G.condition_variables_arena, struct win32_condition_variable); } sys_mutex_unlock(&lock); } - MEMZERO_STRUCT(cv); - InitializeConditionVariable(&cv->condition_variable); - return cv; } @@ -2663,105 +2653,36 @@ void sys_condition_variable_release(struct sys_condition_variable *sys_cv) { __prof; struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; - /* Condition variable must not have any sleepers (signal before releasing) */ - ASSERT(atomic_i64_fetch(&cv->num_yielders) == 0); win32_condition_variable_release(cv); } void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct sys_lock *lock) { struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; - struct win32_mutex *m = (struct win32_mutex *)lock->mutex; + struct sys_mutex *mutex = lock->mutex; b32 exclusive = lock->exclusive; -#if RTC - atomic_i64_fetch_add(&cv->num_yielders, 1); - if (exclusive) { - m->owner_tid = 0; - } - atomic_i64_fetch_add(&m->count, -1); -#endif - - /* TODO: Correct profiling of internal condition variable sleep / wait mutex state */ - if (exclusive) { - __proflock_after_exclusive_unlock(m->profiling_ctx); - } else { - __proflock_after_shared_unlock(m->profiling_ctx); - } - SleepConditionVariableSRW(&cv->condition_variable, (SRWLOCK *)&m->srwlock, INFINITE, exclusive ? 0 : CONDITION_VARIABLE_LOCKMODE_SHARED); - if (exclusive) { - __proflock_before_exclusive_lock(m->profiling_ctx); - __proflock_after_exclusive_lock(m->profiling_ctx); - } else { - __proflock_before_shared_lock(m->profiling_ctx); - __proflock_after_shared_lock(m->profiling_ctx); - } - -#if RTC - atomic_i64_fetch_add(&m->count, 1); - if (exclusive) { - m->owner_tid = (u64)GetCurrentThreadId(); - } - atomic_i64_fetch_add(&cv->num_yielders, -1); -#endif -} - -void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, struct sys_lock *lock, f64 seconds) -{ - struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; - struct win32_mutex *m = (struct win32_mutex *)lock->mutex; - b32 exclusive = lock->exclusive; -#if RTC - atomic_i64_fetch_add(&cv->num_yielders, 1); - if (exclusive) { - m->owner_tid = 0; - } - atomic_i64_fetch_add(&m->count, -1); -#endif - u32 ms = (u32)math_round_to_int((f32)seconds * 1000.f); - - /* TODO: Correct profiling of internal condition variable sleep / wait mutex state */ - if (exclusive) { - __proflock_after_exclusive_unlock(m->profiling_ctx); - } else { - __proflock_after_shared_unlock(m->profiling_ctx); - } - SleepConditionVariableSRW(&cv->condition_variable, (SRWLOCK *)&m->srwlock, ms, exclusive ? 0 : CONDITION_VARIABLE_LOCKMODE_SHARED); - if (exclusive) { - __proflock_before_exclusive_lock(m->profiling_ctx); - __proflock_after_exclusive_lock(m->profiling_ctx); - } else { - __proflock_before_shared_lock(m->profiling_ctx); - __proflock_after_shared_lock(m->profiling_ctx); - } - -#if RTC - atomic_i64_fetch_add(&m->count, 1); - if (exclusive) { - m->owner_tid = (u64)GetCurrentThreadId(); - } - atomic_i64_fetch_add(&cv->num_yielders, -1); -#endif -} - -void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 count) -{ - struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; - /* Windows will wake all waiters if many single-wakes occur anyway, so we - * might as well wake all ourselves. - * https://devblogs.microsoft.com/oldnewthing/20180201-00/?p=97946 */ - if (count <= 24) { - for (u32 i = 0; i < count; ++i) { - WakeConditionVariable(&cv->condition_variable); + u64 old_wake_gen = atomic_u64_fetch(&cv->wake_gen); + u64 wake_gen = old_wake_gen; + { + sys_mutex_unlock(lock); + do { + sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen)); + wake_gen = atomic_u64_fetch(&cv->wake_gen); + } while (wake_gen == old_wake_gen); + sys_wake_all(&cv->wake_gen); + if (exclusive) { + *lock = sys_mutex_lock_e(mutex); + } else { + *lock = sys_mutex_lock_s(mutex); } - } else { - WakeAllConditionVariable(&cv->condition_variable); } } void sys_condition_variable_broadcast(struct sys_condition_variable *sys_cv) { struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv; - WakeAllConditionVariable(&cv->condition_variable); + atomic_u64_fetch_add_u64(&cv->wake_gen, 1); + sys_wake_all(&cv->wake_gen); } /* ========================== *