merge addr & time wake logic into function call

This commit is contained in:
jacob 2025-07-10 23:16:51 -05:00
parent 7e81231639
commit 13e1860656

View File

@ -50,7 +50,7 @@
#define MAX_EXIT_FUNCS 1024
/* Arbitrary threshold for determining when to fall back from a looped WakeByAddressSingle to WakeByAddressAll */
#define WAKE_ALL_THRESHOLD 8
#define WAKE_ALL_THRESHOLD 24
struct ticket_mutex {
struct atomic_i64_padded ticket;
@ -164,9 +164,7 @@ struct alignas(64) fiber {
/* ==================================================== */
i16 id; /* 02 bytes */
i16 parent_id; /* 02 bytes */
u8 _pad0[4]; /* 04 bytes (padding) */
/* ==================================================== */
struct atomic_u64 wake_gen; /* 08 bytes */
struct atomic_i32 wake_lock; /* 04 bytes */
/* ==================================================== */
u64 wait_addr; /* 08 bytes */
/* ==================================================== */
@ -177,6 +175,8 @@ struct alignas(64) fiber {
i16 next_time_waiter; /* 02 bytes */
i16 prev_time_waiter; /* 02 bytes */
/* ==================================================== */
u8 _pad0[8]; /* 08 bytes (padding) */
/* ==================================================== */
u8 _pad1[8]; /* 08 bytes (padding) */
/* ==================================================== */
@ -425,64 +425,44 @@ void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns)
}
}
void sys_wake(void *addr, i32 count)
/* REQUIRED: Caller must have acquired `wake_lock` for each fiber in array */
INTERNAL void wake_fibers_locked(i32 num_fibers, struct fiber **fibers)
{
struct arena_temp scratch = scratch_begin_no_conflict();
/* Update wait lists */
for (i32 i = 0; i < num_fibers; ++i) {
struct fiber *fiber = fibers[i];
u64 wait_addr = fiber->wait_addr;
u64 wait_time = fiber->wait_time;
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1);
u64 wait_addr_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
struct wait_list *wait_addr_list = 0;
/* Get list of waiters */
i32 num_waiters = 0;
struct fiber **waiters = 0;
{
tm_lock(&wait_addr_bin->lock);
{
/* Search for wait addr list */
/* Lock & search wait bins */
/* TODO: Cache these in parameters since caller has one of them already calculated */
struct wait_bin *wait_addr_bin = 0;
struct wait_bin *wait_time_bin = 0;
struct wait_list *wait_addr_list = 0;
struct wait_list *wait_time_list = 0;
if (wait_addr != 0) {
wait_addr_bin = &G.wait_addr_bins[wait_addr % NUM_WAIT_ADDR_BINS];
tm_lock(&wait_addr_bin->lock);
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)addr) {
if (tmp->value == (u64)wait_addr) {
wait_addr_list = tmp;
}
}
if (wait_addr_list) {
/* Build waiters array */
waiters = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_addr_list->num_waiters);
for (struct fiber *waiter = fiber_from_id(wait_addr_list->first_waiter); waiter && num_waiters < count; waiter = fiber_from_id(waiter->next_addr_waiter)) {
if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) {
waiters[num_waiters] = waiter;
++num_waiters;
}
}
if (wait_time != 0) {
wait_time_bin = &G.wait_time_bins[wait_time % NUM_WAIT_TIME_BINS];
tm_lock(&wait_time_bin->lock);
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)wait_time) {
wait_time_list = tmp;
}
}
}
tm_unlock(&wait_addr_bin->lock);
}
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
u64 wait_time = waiter->wait_time;
u64 wait_time_bin_index = wait_time % NUM_WAIT_TIME_BINS;
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
if (wait_time != 0) tm_lock(&wait_time_bin->lock);
{
/* Search for wait time list */
struct wait_list *wait_time_list = 0;
if (wait_time != 0) {
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)wait_time) {
wait_time_list = tmp;
}
}
}
/* Remove from addr list */
{
if (wait_addr_list) {
if (--wait_addr_list->num_waiters == 0) {
/* Free wait addr list */
/* Free addr list */
struct wait_list *prev = wait_addr_list->prev_in_bin;
struct wait_list *next = wait_addr_list->next_in_bin;
if (prev) {
@ -491,15 +471,15 @@ void sys_wake(void *addr, i32 count)
wait_addr_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
next->prev_in_bin = prev;
} else {
wait_addr_bin->last_wait_list = prev;
}
wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list;
wait_addr_bin->first_free_wait_list = wait_addr_list;
} else {
i16 prev_id = waiter->prev_addr_waiter;
i16 next_id = waiter->next_addr_waiter;
i16 prev_id = fiber->prev_addr_waiter;
i16 next_id = fiber->next_addr_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_addr_waiter = next_id;
} else {
@ -511,14 +491,14 @@ void sys_wake(void *addr, i32 count)
wait_addr_list->last_waiter = prev_id;
}
}
waiter->wait_addr = 0;
waiter->prev_addr_waiter = 0;
waiter->next_addr_waiter = 0;
fiber->wait_addr = 0;
fiber->prev_addr_waiter = 0;
fiber->next_addr_waiter = 0;
}
/* Remove from time list */
if (wait_time_list) {
if (--wait_time_list->num_waiters == 0) {
/* Free wait time list */
/* Free time list */
struct wait_list *prev = wait_time_list->prev_in_bin;
struct wait_list *next = wait_time_list->next_in_bin;
if (prev) {
@ -527,15 +507,15 @@ void sys_wake(void *addr, i32 count)
wait_time_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
next->prev_in_bin = prev;
} else {
wait_time_bin->last_wait_list = prev;
}
wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list;
wait_time_bin->first_free_wait_list = wait_time_list;
} else {
i16 prev_id = waiter->prev_time_waiter;
i16 next_id = waiter->next_time_waiter;
i16 prev_id = fiber->prev_time_waiter;
i16 next_id = fiber->next_time_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_time_waiter = next_id;
} else {
@ -547,19 +527,21 @@ void sys_wake(void *addr, i32 count)
wait_time_list->last_waiter = prev_id;
}
}
waiter->wait_time = 0;
waiter->prev_time_waiter = 0;
waiter->next_time_waiter = 0;
fiber->wait_time = 0;
fiber->prev_time_waiter = 0;
fiber->next_time_waiter = 0;
}
}
if (wait_time != 0) tm_unlock(&wait_time_bin->lock);
/* Unlock wait bins */
if (wait_time_bin != 0) tm_unlock(&wait_time_bin->lock);
if (wait_addr_bin != 0) tm_unlock(&wait_addr_bin->lock);
}
/* Resume waiters */
/* Resume jobs */
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
for (i32 i = 0; i < num_fibers; ++i) {
struct fiber *fiber = fibers[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(fiber->job_priority);
struct job_queue *queue = &G.job_queues[queue_kind];
tm_lock(&queue->lock);
{
@ -572,23 +554,71 @@ void sys_wake(void *addr, i32 count)
}
MEMZERO_STRUCT(info);
info->count = 1;
info->num_dispatched = waiter->job_id;
info->func = waiter->job_func;
info->sig = waiter->job_sig;
info->counter = waiter->job_counter;
info->fiber_id = waiter->id;
info->num_dispatched = fiber->job_id;
info->func = fiber->job_func;
info->sig = fiber->job_sig;
info->counter = fiber->job_counter;
info->fiber_id = fiber->id;
if (queue->last) {
queue->last->next = info;
} else {
queue->first = info;
}
queue->last = info;
atomic_u64_fetch_set(&waiter->wake_gen, 0);
atomic_i32_fetch_set(&fiber->wake_lock, 0);
}
tm_unlock(&queue->lock);
}
/* Wake blocking waiters */
/* Wake workers */
if (num_fibers > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_fibers);
snc_cv_signal(&G.workers_wake_cv, num_fibers);
}
snc_unlock(&lock);
}
}
INTERNAL void wake_address(void *addr, i32 count)
{
struct arena_temp scratch = scratch_begin_no_conflict();
u64 wait_addr_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
struct wait_list *wait_addr_list = 0;
/* Get list of waiting fibers */
i32 num_fibers = 0;
struct fiber **fibers = 0;
{
tm_lock(&wait_addr_bin->lock);
{
/* Search for wait addr list */
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)addr) {
wait_addr_list = tmp;
}
}
/* Lock & build fibers array */
if (wait_addr_list) {
fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_addr_list->num_waiters);
for (struct fiber *fiber = fiber_from_id(wait_addr_list->first_waiter); fiber && num_fibers < count; fiber = fiber_from_id(fiber->next_addr_waiter)) {
if (atomic_i32_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) {
fibers[num_fibers] = fiber;
++num_fibers;
}
}
}
}
tm_unlock(&wait_addr_bin->lock);
}
wake_fibers_locked(num_fibers, fibers);
/* Wake win32 blocking thread waiters */
if (count >= WAKE_ALL_THRESHOLD) {
WakeByAddressAll(addr);
} else {
@ -597,20 +627,55 @@ void sys_wake(void *addr, i32 count)
}
}
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
if (num_waiters > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
scratch_end(scratch);
}
INTERNAL void wake_time(u64 time)
{
struct arena_temp scratch = scratch_begin_no_conflict();
u64 wait_time_bin_index = (u64)time % NUM_WAIT_TIME_BINS;
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
struct wait_list *wait_time_list = 0;
/* Build list of waiters to resume */
i32 num_fibers = 0;
struct fiber **fibers = 0;
{
tm_lock(&wait_time_bin->lock);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters);
snc_cv_signal(&G.workers_wake_cv, num_waiters);
/* Search for wait time list */
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)time) {
wait_time_list = tmp;
}
}
if (wait_time_list) {
/* Set waiter wake status & build fibers list */
fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_time_list->num_waiters);
for (struct fiber *fiber = fiber_from_id(wait_time_list->first_waiter); fiber; fiber = fiber_from_id(fiber->next_time_waiter)) {
if (atomic_i32_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) {
fibers[num_fibers] = fiber;
++num_fibers;
}
}
}
}
snc_unlock(&lock);
tm_unlock(&wait_time_bin->lock);
}
wake_fibers_locked(num_fibers, fibers);
scratch_end(scratch);
}
void sys_wake(void *addr, i32 count)
{
wake_address(addr, count);
}
/* ========================== *
* Fibers
* ========================== */
@ -765,7 +830,6 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit
tm_unlock(&queue->lock);
}
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, count);
@ -1150,8 +1214,6 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
ASSERT(success);
}
struct arena_temp scratch = scratch_begin_no_conflict();
/* Create high resolution timer */
HANDLE timer = CreateWaitableTimerExW(0, 0, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
if (!timer) {
@ -1180,14 +1242,17 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
WaitForSingleObject(timer, INFINITE);
}
/* Calculate mean period */
i64 now_ns = sys_time_ns();
if (last_cycle_ns != 0) {
i64 new_period_ns = now_ns - last_cycle_ns;
periods[periods_index++] = new_period_ns;
i64 period_ns = last_cycle_ns == 0 ? DEFAULT_SCHEDULER_CYCLE_PERIOD_NS : now_ns - last_cycle_ns;
last_cycle_ns = now_ns;
/* Calculate mean period */
{
periods[periods_index++] = period_ns;
if (periods_index == countof(periods)) {
periods_index = 0;
}
/* Calculate mean period */
f64 periods_sum_ns = 0;
for (i32 i = 0; i < (i32)countof(periods); ++i) {
periods_sum_ns += (f64)periods[i];
@ -1195,192 +1260,13 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
f64 mean_ns = periods_sum_ns / (f64)countof(periods);
atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns.v, math_round_to_int64(mean_ns));
}
last_cycle_ns = now_ns;
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1);
i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle.v, 1) + 1;
{
__profn("Job scheduler run");
struct arena_temp temp = arena_temp_begin(scratch.arena);
u64 wait_time_bin_index = (u64)current_cycle % NUM_WAIT_TIME_BINS;
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
struct wait_list *wait_time_list = 0;
/* Build list of waiters to resume */
i32 num_waiters = 0;
struct fiber **waiters = 0;
{
tm_lock(&wait_time_bin->lock);
{
/* Search for wait time list */
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)current_cycle) {
wait_time_list = tmp;
}
}
if (wait_time_list) {
/* Set waiter wake status & build waiters list */
waiters = arena_push_array_no_zero(temp.arena, struct fiber *, wait_time_list->num_waiters);
for (struct fiber *waiter = fiber_from_id(wait_time_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_time_waiter)) {
if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) {
waiters[num_waiters] = waiter;
++num_waiters;
}
}
}
}
tm_unlock(&wait_time_bin->lock);
}
/* Update wait lists */
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
u64 wait_addr = waiter->wait_addr;
u64 wait_addr_bin_index = wait_addr % NUM_WAIT_ADDR_BINS;
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
if (wait_addr != 0) tm_lock(&wait_addr_bin->lock);
{
/* Search for wait addr list */
struct wait_list *wait_addr_list = 0;
if (wait_addr != 0) {
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
if (tmp->value == (u64)wait_addr) {
wait_addr_list = tmp;
}
}
}
tm_lock(&wait_time_bin->lock);
{
/* Remove from addr list */
if (wait_addr_list) {
if (--wait_addr_list->num_waiters == 0) {
/* Free addr list */
struct wait_list *prev = wait_addr_list->prev_in_bin;
struct wait_list *next = wait_addr_list->next_in_bin;
if (prev) {
prev->next_in_bin = next;
} else {
wait_addr_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
} else {
wait_addr_bin->last_wait_list = prev;
}
wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list;
wait_addr_bin->first_free_wait_list = wait_addr_list;
} else {
i16 prev_id = waiter->prev_addr_waiter;
i16 next_id = waiter->next_addr_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_addr_waiter = next_id;
} else {
wait_addr_list->first_waiter = next_id;
}
if (next_id) {
fiber_from_id(next_id)->prev_addr_waiter = prev_id;
} else {
wait_addr_list->last_waiter = prev_id;
}
}
waiter->wait_addr = 0;
waiter->prev_addr_waiter = 0;
waiter->next_addr_waiter = 0;
}
/* Remove from time list */
{
if (--wait_time_list->num_waiters == 0) {
/* Free time list */
struct wait_list *prev = wait_time_list->prev_in_bin;
struct wait_list *next = wait_time_list->next_in_bin;
if (prev) {
prev->next_in_bin = next;
} else {
wait_time_bin->first_wait_list = next;
}
if (next) {
next->next_in_bin = prev;
} else {
wait_time_bin->last_wait_list = prev;
}
wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list;
wait_time_bin->first_free_wait_list = wait_time_list;
} else {
i16 prev_id = waiter->prev_time_waiter;
i16 next_id = waiter->next_time_waiter;
if (prev_id) {
fiber_from_id(prev_id)->next_time_waiter = next_id;
} else {
wait_time_list->first_waiter = next_id;
}
if (next_id) {
fiber_from_id(next_id)->prev_time_waiter = prev_id;
} else {
wait_time_list->last_waiter = prev_id;
}
}
waiter->wait_time = 0;
waiter->prev_time_waiter = 0;
waiter->next_time_waiter = 0;
}
}
tm_unlock(&wait_time_bin->lock);
}
if (wait_addr != 0) tm_unlock(&wait_addr_bin->lock);
}
/* Resume waiters */
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
for (i32 i = 0; i < num_waiters; ++i) {
struct fiber *waiter = waiters[i];
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
struct job_queue *queue = &G.job_queues[queue_kind];
tm_lock(&queue->lock);
{
struct job_info *info = 0;
if (queue->first_free) {
info = queue->first_free;
queue->first_free = info->next;
} else {
info = arena_push_no_zero(queue->arena, struct job_info);
}
MEMZERO_STRUCT(info);
info->count = 1;
info->num_dispatched = waiter->job_id;
info->func = waiter->job_func;
info->sig = waiter->job_sig;
info->counter = waiter->job_counter;
info->fiber_id = waiter->id;
if (queue->last) {
queue->last->next = info;
} else {
queue->first = info;
}
queue->last = info;
atomic_u64_fetch_set(&waiter->wake_gen, 0);
}
tm_unlock(&queue->lock);
}
/* Wake workers */
/* TODO: Only wake necessary amount of workers */
if (num_waiters > 0) {
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
{
atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters);
snc_cv_signal(&G.workers_wake_cv, num_waiters);
}
snc_unlock(&lock);
}
arena_temp_end(temp);
i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle.v, 1) + 1;
wake_time((u64)current_cycle);
}
}
scratch_end(scratch);
}
/* ========================== *