fiber wait wip

This commit is contained in:
jacob 2025-07-05 22:40:39 -05:00
parent d47b951b82
commit 40dcdb40dc
3 changed files with 99 additions and 173 deletions

View File

@ -1382,7 +1382,14 @@ INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg)
while (!G.evictor_scheduler_shutdown) {
sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, job_counter);
sys_counter_wait(job_counter);
/* FIXME: Enable this */
#if 0
sys_condition_variable_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS));
#else
sys_mutex_unlock(&evictor_lock);
sys_sleep(SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS));
evictor_lock = sys_mutex_lock_e(G.evictor_scheduler_mutex);
#endif
}
sys_mutex_unlock(&evictor_lock);
}

View File

@ -388,8 +388,6 @@ void sys_assert_locked_e_or_s(struct sys_lock *lock, struct sys_mutex *mutex);
struct sys_condition_variable *sys_condition_variable_alloc(void);
void sys_condition_variable_release(struct sys_condition_variable *sys_cv);
void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct sys_lock *lock);
void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, struct sys_lock *lock, f64 seconds);
void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 count);
void sys_condition_variable_broadcast(struct sys_condition_variable *sys_cv);
/* ========================== *

View File

@ -35,25 +35,13 @@
#define FIBER_STACK_SIZE MEGABYTE(4)
struct win32_mutex {
SRWLOCK srwlock;
volatile LONG state; /* Lower 30 bits = reader count, bit 30 = writer waiting, bit 31 = writer held */
struct win32_mutex *next_free;
#if PROFILING_LOCKS
__proflock_ctx(profiling_ctx);
#endif
#if RTC
u64 owner_tid;
struct atomic_i64 count;
#endif
};
struct win32_condition_variable {
CONDITION_VARIABLE condition_variable;
struct atomic_u64 wake_gen;
struct win32_condition_variable *next_free;
#if RTC
struct atomic_i64 num_yielders;
#endif
};
struct win32_thread {
@ -393,7 +381,7 @@ GLOBAL struct {
INTERNAL struct fiber *fiber_from_id(i32 id);
INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param);
INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber);
@ -416,7 +404,8 @@ INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struc
void sys_wait(void *addr, void *cmp, u32 size)
{
#if 1
__prof;
#if 0
WaitOnAddress(addr, cmp, size, INFINITE);
#else
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
@ -425,7 +414,7 @@ void sys_wait(void *addr, void *cmp, u32 size)
if (parent_fiber_id > 0) {
#if 1
/* Yield if job fiber */
struct yield_param param = {
*fiber->yield_param = (struct yield_param) {
.kind = YIELD_KIND_WAIT,
.wait = {
.addr = addr,
@ -433,7 +422,8 @@ void sys_wait(void *addr, void *cmp, u32 size)
.size = size
}
};
fiber_yield(fiber, fiber_from_id(parent_fiber_id), &param);
struct fiber *parent_fiber = fiber_from_id(parent_fiber_id);
fiber_yield(fiber, parent_fiber);
#else
while (MEMEQ(addr, cmp, size)) {
ix_pause();
@ -447,7 +437,7 @@ void sys_wait(void *addr, void *cmp, u32 size)
void sys_wake_all(void *addr)
{
#if 0
#if 1
u64 wait_bin_index = (u64)addr % NUM_WAIT_BINS;
struct wait_bin *bin = &G.wait_bins[wait_bin_index];
@ -472,7 +462,7 @@ void sys_wake_all(void *addr)
for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) {
enum job_queue_kind queue_kind = yielder->job_queue_kind;
i32 index = queue_counts[queue_kind]++;
struct yielder **array = queue_yielder_arrays[index];
struct yielder **array = queue_yielder_arrays[queue_kind];
array[index] = yielder;
}
/* Push jobs */
@ -511,11 +501,17 @@ void sys_wake_all(void *addr)
atomic_i32_fetch_set(&queue->lock, 0);
}
}
/* Free yielders */
wait_list->last_yielder->next = wait_list->first_free_yielder;
wait_list->first_free_yielder = wait_list->first_yielder;
wait_list->first_yielder = NULL;
wait_list->last_yielder = NULL;
wait_list->num_yielders = 0;
}
scratch_end(scratch);
}
}
atomic_i32_fetch_set(&bin->lock, 1);
atomic_i32_fetch_set(&bin->lock, 0);
#endif
/* Wake blocking waiters */
@ -692,7 +688,7 @@ i32 sys_current_fiber_id(void)
return (i32)(i64)GetFiberData();
}
INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struct yield_param *param)
INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber)
{
ASSERT(fiber->id == sys_current_fiber_id());
ASSERT(parent_fiber->id == fiber->parent_id);
@ -701,7 +697,6 @@ INTERNAL void fiber_yield(struct fiber *fiber, struct fiber *parent_fiber, struc
}
{
__prof_fiber_leave;
fiber->yield_param = param;
SwitchToFiber(parent_fiber->addr);
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id);
}
@ -714,8 +709,8 @@ void sys_yield(void)
i32 parent_id = fiber->parent_id;
if (parent_id > 0) { /* Top level fibers should not yield */
struct fiber *parent_fiber = fiber_from_id(parent_id);
struct yield_param param = { .kind = YIELD_KIND_COOPERATIVE };
fiber_yield(fiber, parent_fiber, &param);
fiber->yield_param->kind = YIELD_KIND_COOPERATIVE;
fiber_yield(fiber, parent_fiber);
}
}
@ -764,26 +759,24 @@ INTERNAL void job_fiber_entry(void *id_ptr)
{
i32 id = (i32)(i64)id_ptr;
struct fiber *fiber = fiber_from_id(id);
struct yield_param yield = ZI;
while (true) {
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id);
i32 parent_id = fiber->parent_id;
struct fiber *parent_fiber = fiber_from_id(parent_id);
void *parent_fiber_addr = parent_fiber->addr;
/* Run job */
{
__profscope(Run job);
yield.kind = YIELD_KIND_NONE;
fiber->yield_param = &yield;
fiber->yield_param->kind = YIELD_KIND_NONE;
struct sys_job_data data = ZI;
data.id = fiber->job_id;
data.sig = fiber->job_sig;
fiber->job_func(data);
yield.kind = YIELD_KIND_DONE;
fiber->yield_param = &yield;
fiber->yield_param->kind = YIELD_KIND_DONE;
}
__prof_fiber_leave;
SwitchToFiber(parent_fiber_addr);
{
i32 parent_id = fiber->parent_id;
struct fiber *parent_fiber = fiber_from_id(parent_id);
SwitchToFiber(parent_fiber->addr);
}
}
}
@ -793,6 +786,7 @@ INTERNAL void job_fiber_entry(void *id_ptr)
INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
{
__prof;
struct worker_ctx *ctx = worker_ctx_arg;
(UNUSED)ctx;
@ -881,36 +875,38 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
if (!job_fiber) {
job_fiber = fiber_alloc(FIBER_KIND_JOB_WORKER);
}
struct yield_param yield = ZI;
job_fiber->job_func = job_func;
job_fiber->job_sig = job_sig;
job_fiber->job_id = job_id;
job_fiber->job_priority = job_priority;
job_fiber->parent_id = worker_fiber_id;
job_fiber->yield_param = &yield;
b32 done = false;
while (!done) {
SwitchToFiber(job_fiber->addr);
struct yield_param *yield_param = job_fiber->yield_param;
enum yield_kind yield_kind = yield_param ? yield_param->kind : YIELD_KIND_NONE;
switch (yield_kind) {
switch (yield.kind) {
default:
{
/* Invalid yield kind */
sys_panic(LIT("Invalid fiber yield"));
struct arena_temp scratch = scratch_begin_no_conflict();
sys_panic(string_format(scratch.arena, LIT("Invalid fiber yield kind \"%F\""), FMT_SINT(yield.kind)));
scratch_end(scratch);
} break;
case YIELD_KIND_WAIT:
{
#if 1
void *wait_addr = yield_param->wait.addr;
void *wait_cmp = yield_param->wait.cmp;
u32 wait_size = yield_param->wait.size;
void *wait_addr = yield.wait.addr;
void *wait_cmp = yield.wait.cmp;
u32 wait_size = yield.wait.size;
u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_BINS;
struct wait_bin *bin = &G.wait_bins[wait_bin_index];
while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
{
if (!MEMEQ(wait_addr, wait_cmp, wait_size)) {
if (MEMEQ(wait_addr, wait_cmp, wait_size)) {
/* Search addr wait list in bin */
struct wait_list *wait_list = NULL;
for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) {
@ -978,7 +974,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
atomic_i32_fetch_set(&bin->lock, 0);
#else
(UNUSED)job_queue_kind;
ASSERT(false);
//ASSERT(false);
#endif
} break;
@ -993,13 +989,6 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
}
}
}
/* TODO */
#if 0
{
__profscope(Worker wait);
}
#endif
}
INTERNAL SYS_THREAD_DEF(test_entry, _)
@ -2515,17 +2504,7 @@ void sys_window_cursor_disable_clip(struct sys_window *sys_window)
INTERNAL void win32_mutex_init(struct win32_mutex *m)
{
#if PROFILING_LOCKS
__proflock_ctx(profiling_ctx) = m->profiling_ctx;
#endif
MEMZERO_STRUCT(m);
m->srwlock = (SRWLOCK)SRWLOCK_INIT;
#if PROFILING_LOCKS
if (!profiling_ctx) {
__proflock_alloc(profiling_ctx);
}
m->profiling_ctx = profiling_ctx;
#endif
}
struct sys_mutex *sys_mutex_alloc(void)
@ -2550,7 +2529,6 @@ void sys_mutex_release(struct sys_mutex *mutex)
{
__prof;
struct win32_mutex *m = (struct win32_mutex *)mutex;
ASSERT(atomic_i64_fetch(&m->count) == 0); /* Mutex should be unlocked */
{
struct sys_lock lock = sys_mutex_lock_e(G.mutexes_mutex);
m->next_free = G.first_free_mutex;
@ -2562,13 +2540,22 @@ void sys_mutex_release(struct sys_mutex *mutex)
struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex)
{
struct win32_mutex *m = (struct win32_mutex *)mutex;
__proflock_before_exclusive_lock(m->profiling_ctx);
AcquireSRWLockExclusive((SRWLOCK *)&m->srwlock);
__proflock_after_exclusive_lock(m->profiling_ctx);
#if RTC
m->owner_tid = (u64)GetCurrentThreadId();
atomic_i64_fetch_add(&m->count, 1);
#endif
{
while (true) {
//LONG expected = 0;
if (InterlockedCompareExchange(&m->state, 0x80000000, 0) == 0) {
break; /* Acquired exclusive */
} else {
/* Set writer pending */
LONG old;
do {
old = m->state;
if (old & 0x40000000) break; /* Already pending */
} while (InterlockedCompareExchange(&m->state, old | 0x40000000, old) != old);
sys_wait((void *)&m->state, (void *)&old, sizeof(old));
}
}
}
struct sys_lock lock = ZI;
lock.exclusive = true;
lock.mutex = mutex;
@ -2578,12 +2565,19 @@ struct sys_lock sys_mutex_lock_e(struct sys_mutex *mutex)
struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex)
{
struct win32_mutex *m = (struct win32_mutex *)mutex;
__proflock_before_shared_lock(m->profiling_ctx);
AcquireSRWLockShared((SRWLOCK *)&m->srwlock);
__proflock_after_shared_lock(m->profiling_ctx);
#if RTC
atomic_i64_fetch_add(&m->count, 1);
#endif
{
while (true) {
LONG old = m->state;
/* If writer not held or pending */
if ((old & 0xC0000000) == 0) {
if (InterlockedCompareExchange(&m->state, old + 1, old) == old) {
break; /* Acquired shared */
}
} else {
sys_wait((void *)&m->state, &old, sizeof(old));
}
}
}
struct sys_lock lock = ZI;
lock.mutex = mutex;
return lock;
@ -2592,16 +2586,15 @@ struct sys_lock sys_mutex_lock_s(struct sys_mutex *mutex)
void sys_mutex_unlock(struct sys_lock *lock)
{
struct win32_mutex *m = (struct win32_mutex *)lock->mutex;
#if RTC
atomic_i64_fetch_add(&m->count, -1);
m->owner_tid = 0;
#endif
if (lock->exclusive) {
ReleaseSRWLockExclusive((SRWLOCK *)&m->srwlock);
__proflock_after_exclusive_unlock(m->profiling_ctx);
//LONG old = m->state;
InterlockedExchange(&m->state, 0); /* Clear all bits */
sys_wake_all((void *)&m->state);
} else {
ReleaseSRWLockShared((SRWLOCK *)&m->srwlock);
__proflock_after_shared_unlock(m->profiling_ctx);
LONG old = InterlockedDecrement(&m->state);
if (old == 1) {
sys_wake_all((void *)&m->state);
}
}
MEMZERO_STRUCT(lock);
}
@ -2633,14 +2626,11 @@ INTERNAL struct win32_condition_variable *win32_condition_variable_alloc(void)
cv = G.first_free_condition_variable;
G.first_free_condition_variable = cv->next_free;
} else {
cv = arena_push(G.condition_variables_arena, struct win32_condition_variable);
cv = arena_push_no_zero(G.condition_variables_arena, struct win32_condition_variable);
}
sys_mutex_unlock(&lock);
}
MEMZERO_STRUCT(cv);
InitializeConditionVariable(&cv->condition_variable);
return cv;
}
@ -2663,105 +2653,36 @@ void sys_condition_variable_release(struct sys_condition_variable *sys_cv)
{
__prof;
struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv;
/* Condition variable must not have any sleepers (signal before releasing) */
ASSERT(atomic_i64_fetch(&cv->num_yielders) == 0);
win32_condition_variable_release(cv);
}
void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct sys_lock *lock)
{
struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv;
struct win32_mutex *m = (struct win32_mutex *)lock->mutex;
struct sys_mutex *mutex = lock->mutex;
b32 exclusive = lock->exclusive;
#if RTC
atomic_i64_fetch_add(&cv->num_yielders, 1);
u64 old_wake_gen = atomic_u64_fetch(&cv->wake_gen);
u64 wake_gen = old_wake_gen;
{
sys_mutex_unlock(lock);
do {
sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen));
wake_gen = atomic_u64_fetch(&cv->wake_gen);
} while (wake_gen == old_wake_gen);
sys_wake_all(&cv->wake_gen);
if (exclusive) {
m->owner_tid = 0;
}
atomic_i64_fetch_add(&m->count, -1);
#endif
/* TODO: Correct profiling of internal condition variable sleep / wait mutex state */
if (exclusive) {
__proflock_after_exclusive_unlock(m->profiling_ctx);
*lock = sys_mutex_lock_e(mutex);
} else {
__proflock_after_shared_unlock(m->profiling_ctx);
*lock = sys_mutex_lock_s(mutex);
}
SleepConditionVariableSRW(&cv->condition_variable, (SRWLOCK *)&m->srwlock, INFINITE, exclusive ? 0 : CONDITION_VARIABLE_LOCKMODE_SHARED);
if (exclusive) {
__proflock_before_exclusive_lock(m->profiling_ctx);
__proflock_after_exclusive_lock(m->profiling_ctx);
} else {
__proflock_before_shared_lock(m->profiling_ctx);
__proflock_after_shared_lock(m->profiling_ctx);
}
#if RTC
atomic_i64_fetch_add(&m->count, 1);
if (exclusive) {
m->owner_tid = (u64)GetCurrentThreadId();
}
atomic_i64_fetch_add(&cv->num_yielders, -1);
#endif
}
void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, struct sys_lock *lock, f64 seconds)
{
struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv;
struct win32_mutex *m = (struct win32_mutex *)lock->mutex;
b32 exclusive = lock->exclusive;
#if RTC
atomic_i64_fetch_add(&cv->num_yielders, 1);
if (exclusive) {
m->owner_tid = 0;
}
atomic_i64_fetch_add(&m->count, -1);
#endif
u32 ms = (u32)math_round_to_int((f32)seconds * 1000.f);
/* TODO: Correct profiling of internal condition variable sleep / wait mutex state */
if (exclusive) {
__proflock_after_exclusive_unlock(m->profiling_ctx);
} else {
__proflock_after_shared_unlock(m->profiling_ctx);
}
SleepConditionVariableSRW(&cv->condition_variable, (SRWLOCK *)&m->srwlock, ms, exclusive ? 0 : CONDITION_VARIABLE_LOCKMODE_SHARED);
if (exclusive) {
__proflock_before_exclusive_lock(m->profiling_ctx);
__proflock_after_exclusive_lock(m->profiling_ctx);
} else {
__proflock_before_shared_lock(m->profiling_ctx);
__proflock_after_shared_lock(m->profiling_ctx);
}
#if RTC
atomic_i64_fetch_add(&m->count, 1);
if (exclusive) {
m->owner_tid = (u64)GetCurrentThreadId();
}
atomic_i64_fetch_add(&cv->num_yielders, -1);
#endif
}
void sys_condition_variable_signal(struct sys_condition_variable *sys_cv, u32 count)
{
struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv;
/* Windows will wake all waiters if many single-wakes occur anyway, so we
* might as well wake all ourselves.
* https://devblogs.microsoft.com/oldnewthing/20180201-00/?p=97946 */
if (count <= 24) {
for (u32 i = 0; i < count; ++i) {
WakeConditionVariable(&cv->condition_variable);
}
} else {
WakeAllConditionVariable(&cv->condition_variable);
}
}
void sys_condition_variable_broadcast(struct sys_condition_variable *sys_cv)
{
struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv;
WakeAllConditionVariable(&cv->condition_variable);
atomic_u64_fetch_add_u64(&cv->wake_gen, 1);
sys_wake_all(&cv->wake_gen);
}
/* ========================== *