blockable counter waiters

This commit is contained in:
jacob 2025-07-05 18:31:10 -05:00
parent 402f8a12c9
commit ba3a2454db

View File

@ -26,6 +26,7 @@
#pragma comment(lib, "winmm")
#pragma comment(lib, "dwmapi")
#pragma comment(lib, "bcrypt")
#pragma comment(lib, "synchronization")
#define SYS_WINDOW_EVENT_LISTENERS_MAX 512
#define WINDOW_CLASS_NAME L"power_play_window_class"
@ -51,7 +52,7 @@ struct win32_condition_variable {
CONDITION_VARIABLE condition_variable;
struct win32_condition_variable *next_free;
#if RTC
struct atomic_i64 num_waiters;
struct atomic_i64 num_yielding_waiters;
#endif
};
@ -144,17 +145,18 @@ struct alignas(64) counter {
struct counter *next_free; /* 8 bytes */
/* =================================================== */
struct atomic_i32 waiters_lock; /* 4 bytes */
i32 num_waiters; /* 4 bytes */
u8 _pad0[4]; /* 4 bytes (padding) */
/* =================================================== */
i32 num_blocking_waiters; /* 4 bytes */
i32 num_yielding_waiters; /* 4 bytes */
/* =================================================== */
struct waiter *first_waiter; /* 8 bytes */
/* =================================================== */
struct waiter *last_waiter; /* 8 bytes */
/* =================================================== */
struct atomic_i64 wake_gen; /* 8 bytes */
/* =================================================== */
u8 _pad1[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad2[8]; /* 8 bytes (padding) */
/* =================================================== */
u8 _pad3[8]; /* 8 bytes (padding) */
};
STATIC_ASSERT(sizeof(struct counter) == 64); /* Assume counter fits in one cache line (increase if necessary) */
STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */
@ -352,7 +354,8 @@ GLOBAL struct {
* Counters
* ========================== */
INTERNAL void yield(enum yield_kind kind, void *arg);
INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_kind kind, void *arg);
INTERNAL struct fiber *fiber_from_id(i32 id);
INTERNAL struct counter *counter_alloc(void)
{
@ -395,6 +398,13 @@ INTERNAL void counter_add(struct counter *counter, i64 amount)
{
while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause();
{
/* Wake blocking waiters */
atomic_i64_fetch_add(&counter->wake_gen, 1);
if (counter->num_blocking_waiters > 0) {
WakeByAddressAll(&counter->wake_gen);
}
/* Wake yielding waiters (fibers) */
first_waiter = counter->first_waiter;
last_waiter = counter->last_waiter;
if (first_waiter) {
@ -404,7 +414,8 @@ INTERNAL void counter_add(struct counter *counter, i64 amount)
i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI;
struct waiter **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI;
for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) {
queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct waiter *, counter->num_waiters);
/* NOTE: Each array is conservatively sized as the number of all waiters in the counter */
queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct waiter *, counter->num_yielding_waiters);
}
for (struct waiter *waiter = first_waiter; waiter; waiter = waiter->next) {
enum job_queue_kind queue_kind = waiter->job_queue_kind;
@ -451,13 +462,13 @@ INTERNAL void counter_add(struct counter *counter, i64 amount)
}
/* Reset waiters list */
counter->num_waiters = 0;
counter->num_yielding_waiters = 0;
counter->first_waiter = NULL;
counter->last_waiter = NULL;
}
scratch_end(scratch);
}
counter->num_waiters = 0;
counter->num_yielding_waiters = 0;
}
atomic_i32_fetch_set(&counter->waiters_lock, 0);
}
@ -477,16 +488,49 @@ INTERNAL void counter_add(struct counter *counter, i64 amount)
INTERNAL void counter_wait(struct counter *counter)
{
__prof;
#if 0
/* TODO: Spin with configurable count */
if (atomic_i64_fetch(&counter->v) > 0) {
/* Yield */
yield(YIELD_KIND_WAIT, counter);
}
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i32 parent_fiber_id = fiber->parent_id;
if (parent_fiber_id > 0) {
#if 0
/* Yield if job fiber */
yield(fiber, fiber_from_id(parent_fiber_id), YIELD_KIND_WAIT, counter);
#else
while (atomic_i64_fetch(&counter->v) != 0) {
ix_pause();
}
#endif
} else {
/* Top-level fibers should block since they can't yield */
i64 wake_gen = 0;
{
while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause();
++counter->num_blocking_waiters;
wake_gen = atomic_i64_fetch(&counter->wake_gen);
atomic_i32_fetch_set(&counter->waiters_lock, 0);
}
while (atomic_i64_fetch(&counter->wake_gen) <= wake_gen) {
WaitOnAddress(&counter->wake_gen, &wake_gen, sizeof(wake_gen), INFINITE);
}
}
}
#if 0
if (atomic_i64_fetch(&counter->v) > 0) {
/* Top-level fibers should block since they can't yield */
i64 wake_gen = 0;
{
while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause();
++counter->num_blocking_waiters;
wake_gen = atomic_i64_fetch(&counter->wake_gen);
atomic_i32_fetch_set(&counter->waiters_lock, 0);
}
while (atomic_i64_fetch(&counter->wake_gen) <= wake_gen) {
WaitOnAddress(&counter->wake_gen, &wake_gen, sizeof(wake_gen), INFINITE);
}
}
#endif
}
@ -608,14 +652,13 @@ i32 sys_current_fiber_id(void)
return (i32)(i64)GetFiberData();
}
INTERNAL void yield(enum yield_kind kind, void *arg)
INTERNAL void yield(struct fiber *fiber, struct fiber *parent_fiber, enum yield_kind kind, void *arg)
{
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i32 parent_fiber_id = fiber->parent_id;
if (parent_fiber_id <= 0) {
ASSERT(fiber->id == sys_current_fiber_id());
ASSERT(parent_fiber->id == fiber->parent_id);
if (parent_fiber->id <= 0) {
sys_panic(LIT("A top level fiber tried to yield"));
}
struct fiber *parent_fiber = fiber_from_id(parent_fiber_id);
{
__prof_fiber_leave;
fiber->yield_kind = kind;
@ -627,7 +670,12 @@ INTERNAL void yield(enum yield_kind kind, void *arg)
void sys_yield(void)
{
yield(YIELD_KIND_COOPERATIVE, NULL);
struct fiber *fiber = fiber_from_id(sys_current_fiber_id());
i32 parent_id = fiber->parent_id;
if (parent_id > 0) { /* Top level fibers should not yield */
struct fiber *parent_fiber = fiber_from_id(parent_id);
yield(fiber, parent_fiber, YIELD_KIND_COOPERATIVE, NULL);
}
}
void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct sys_counter *counter)
@ -836,7 +884,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg)
wait_counter->first_waiter = waiter;
}
wait_counter->last_waiter = waiter;
++wait_counter->num_waiters;
++wait_counter->num_yielding_waiters;
/* Pop worker's job fiber */
job_fiber = NULL;
@ -2529,7 +2577,7 @@ void sys_condition_variable_release(struct sys_condition_variable *sys_cv)
__prof;
struct win32_condition_variable *cv = (struct win32_condition_variable *)sys_cv;
/* Condition variable must not have any sleepers (signal before releasing) */
ASSERT(atomic_i64_fetch(&cv->num_waiters) == 0);
ASSERT(atomic_i64_fetch(&cv->num_yielding_waiters) == 0);
win32_condition_variable_release(cv);
}
@ -2539,7 +2587,7 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s
struct win32_mutex *m = (struct win32_mutex *)lock->mutex;
b32 exclusive = lock->exclusive;
#if RTC
atomic_i64_fetch_add(&cv->num_waiters, 1);
atomic_i64_fetch_add(&cv->num_yielding_waiters, 1);
if (exclusive) {
m->owner_tid = 0;
}
@ -2566,7 +2614,7 @@ void sys_condition_variable_wait(struct sys_condition_variable *sys_cv, struct s
if (exclusive) {
m->owner_tid = (u64)GetCurrentThreadId();
}
atomic_i64_fetch_add(&cv->num_waiters, -1);
atomic_i64_fetch_add(&cv->num_yielding_waiters, -1);
#endif
}
@ -2576,7 +2624,7 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str
struct win32_mutex *m = (struct win32_mutex *)lock->mutex;
b32 exclusive = lock->exclusive;
#if RTC
atomic_i64_fetch_add(&cv->num_waiters, 1);
atomic_i64_fetch_add(&cv->num_yielding_waiters, 1);
if (exclusive) {
m->owner_tid = 0;
}
@ -2604,7 +2652,7 @@ void sys_condition_variable_wait_time(struct sys_condition_variable *sys_cv, str
if (exclusive) {
m->owner_tid = (u64)GetCurrentThreadId();
}
atomic_i64_fetch_add(&cv->num_waiters, -1);
atomic_i64_fetch_add(&cv->num_yielding_waiters, -1);
#endif
}