diff --git a/src/sys_win32.c b/src/sys_win32.c index e8940f7c..5c9b0de8 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -50,7 +50,7 @@ #define MAX_EXIT_FUNCS 1024 /* Arbitrary threshold for determining when to fall back from a looped WakeByAddressSingle to WakeByAddressAll */ -#define WAKE_ALL_THRESHOLD 8 +#define WAKE_ALL_THRESHOLD 24 struct ticket_mutex { struct atomic_i64_padded ticket; @@ -164,9 +164,7 @@ struct alignas(64) fiber { /* ==================================================== */ i16 id; /* 02 bytes */ i16 parent_id; /* 02 bytes */ - u8 _pad0[4]; /* 04 bytes (padding) */ - /* ==================================================== */ - struct atomic_u64 wake_gen; /* 08 bytes */ + struct atomic_i32 wake_lock; /* 04 bytes */ /* ==================================================== */ u64 wait_addr; /* 08 bytes */ /* ==================================================== */ @@ -177,6 +175,8 @@ struct alignas(64) fiber { i16 next_time_waiter; /* 02 bytes */ i16 prev_time_waiter; /* 02 bytes */ /* ==================================================== */ + u8 _pad0[8]; /* 08 bytes (padding) */ + /* ==================================================== */ u8 _pad1[8]; /* 08 bytes (padding) */ /* ==================================================== */ @@ -425,64 +425,44 @@ void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns) } } -void sys_wake(void *addr, i32 count) +/* REQUIRED: Caller must have acquired `wake_lock` for each fiber in array */ +INTERNAL void wake_fibers_locked(i32 num_fibers, struct fiber **fibers) { - struct arena_temp scratch = scratch_begin_no_conflict(); + /* Update wait lists */ + for (i32 i = 0; i < num_fibers; ++i) { + struct fiber *fiber = fibers[i]; + u64 wait_addr = fiber->wait_addr; + u64 wait_time = fiber->wait_time; - u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1); - - u64 wait_addr_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS; - struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index]; - struct wait_list *wait_addr_list = 0; - - /* Get list of waiters */ - i32 num_waiters = 0; - struct fiber **waiters = 0; - { - tm_lock(&wait_addr_bin->lock); - { - /* Search for wait addr list */ + /* Lock & search wait bins */ + /* TODO: Cache these in parameters since caller has one of them already calculated */ + struct wait_bin *wait_addr_bin = 0; + struct wait_bin *wait_time_bin = 0; + struct wait_list *wait_addr_list = 0; + struct wait_list *wait_time_list = 0; + if (wait_addr != 0) { + wait_addr_bin = &G.wait_addr_bins[wait_addr % NUM_WAIT_ADDR_BINS]; + tm_lock(&wait_addr_bin->lock); for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)addr) { + if (tmp->value == (u64)wait_addr) { wait_addr_list = tmp; } } - - if (wait_addr_list) { - /* Build waiters array */ - waiters = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_addr_list->num_waiters); - for (struct fiber *waiter = fiber_from_id(wait_addr_list->first_waiter); waiter && num_waiters < count; waiter = fiber_from_id(waiter->next_addr_waiter)) { - if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) { - waiters[num_waiters] = waiter; - ++num_waiters; - } + } + if (wait_time != 0) { + wait_time_bin = &G.wait_time_bins[wait_time % NUM_WAIT_TIME_BINS]; + tm_lock(&wait_time_bin->lock); + for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { + if (tmp->value == (u64)wait_time) { + wait_time_list = tmp; } } } - tm_unlock(&wait_addr_bin->lock); - } - - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - u64 wait_time = waiter->wait_time; - u64 wait_time_bin_index = wait_time % NUM_WAIT_TIME_BINS; - struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index]; - - if (wait_time != 0) tm_lock(&wait_time_bin->lock); { - /* Search for wait time list */ - struct wait_list *wait_time_list = 0; - if (wait_time != 0) { - for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)wait_time) { - wait_time_list = tmp; - } - } - } /* Remove from addr list */ - { + if (wait_addr_list) { if (--wait_addr_list->num_waiters == 0) { - /* Free wait addr list */ + /* Free addr list */ struct wait_list *prev = wait_addr_list->prev_in_bin; struct wait_list *next = wait_addr_list->next_in_bin; if (prev) { @@ -491,15 +471,15 @@ void sys_wake(void *addr, i32 count) wait_addr_bin->first_wait_list = next; } if (next) { - next->next_in_bin = prev; + next->prev_in_bin = prev; } else { wait_addr_bin->last_wait_list = prev; } wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list; wait_addr_bin->first_free_wait_list = wait_addr_list; } else { - i16 prev_id = waiter->prev_addr_waiter; - i16 next_id = waiter->next_addr_waiter; + i16 prev_id = fiber->prev_addr_waiter; + i16 next_id = fiber->next_addr_waiter; if (prev_id) { fiber_from_id(prev_id)->next_addr_waiter = next_id; } else { @@ -511,14 +491,14 @@ void sys_wake(void *addr, i32 count) wait_addr_list->last_waiter = prev_id; } } - waiter->wait_addr = 0; - waiter->prev_addr_waiter = 0; - waiter->next_addr_waiter = 0; + fiber->wait_addr = 0; + fiber->prev_addr_waiter = 0; + fiber->next_addr_waiter = 0; } /* Remove from time list */ if (wait_time_list) { if (--wait_time_list->num_waiters == 0) { - /* Free wait time list */ + /* Free time list */ struct wait_list *prev = wait_time_list->prev_in_bin; struct wait_list *next = wait_time_list->next_in_bin; if (prev) { @@ -527,15 +507,15 @@ void sys_wake(void *addr, i32 count) wait_time_bin->first_wait_list = next; } if (next) { - next->next_in_bin = prev; + next->prev_in_bin = prev; } else { wait_time_bin->last_wait_list = prev; } wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list; wait_time_bin->first_free_wait_list = wait_time_list; } else { - i16 prev_id = waiter->prev_time_waiter; - i16 next_id = waiter->next_time_waiter; + i16 prev_id = fiber->prev_time_waiter; + i16 next_id = fiber->next_time_waiter; if (prev_id) { fiber_from_id(prev_id)->next_time_waiter = next_id; } else { @@ -547,19 +527,21 @@ void sys_wake(void *addr, i32 count) wait_time_list->last_waiter = prev_id; } } - waiter->wait_time = 0; - waiter->prev_time_waiter = 0; - waiter->next_time_waiter = 0; + fiber->wait_time = 0; + fiber->prev_time_waiter = 0; + fiber->next_time_waiter = 0; } } - if (wait_time != 0) tm_unlock(&wait_time_bin->lock); + /* Unlock wait bins */ + if (wait_time_bin != 0) tm_unlock(&wait_time_bin->lock); + if (wait_addr_bin != 0) tm_unlock(&wait_addr_bin->lock); } - /* Resume waiters */ + /* Resume jobs */ /* TODO: Batch submit waiters based on queue kind rather than one at a time */ - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority); + for (i32 i = 0; i < num_fibers; ++i) { + struct fiber *fiber = fibers[i]; + enum job_queue_kind queue_kind = job_queue_kind_from_priority(fiber->job_priority); struct job_queue *queue = &G.job_queues[queue_kind]; tm_lock(&queue->lock); { @@ -572,23 +554,71 @@ void sys_wake(void *addr, i32 count) } MEMZERO_STRUCT(info); info->count = 1; - info->num_dispatched = waiter->job_id; - info->func = waiter->job_func; - info->sig = waiter->job_sig; - info->counter = waiter->job_counter; - info->fiber_id = waiter->id; + info->num_dispatched = fiber->job_id; + info->func = fiber->job_func; + info->sig = fiber->job_sig; + info->counter = fiber->job_counter; + info->fiber_id = fiber->id; if (queue->last) { queue->last->next = info; } else { queue->first = info; } queue->last = info; - atomic_u64_fetch_set(&waiter->wake_gen, 0); + atomic_i32_fetch_set(&fiber->wake_lock, 0); } tm_unlock(&queue->lock); } - /* Wake blocking waiters */ + /* Wake workers */ + if (num_fibers > 0) { + struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); + { + atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_fibers); + snc_cv_signal(&G.workers_wake_cv, num_fibers); + } + snc_unlock(&lock); + } +} + +INTERNAL void wake_address(void *addr, i32 count) +{ + struct arena_temp scratch = scratch_begin_no_conflict(); + + u64 wait_addr_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS; + struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index]; + struct wait_list *wait_addr_list = 0; + + /* Get list of waiting fibers */ + i32 num_fibers = 0; + struct fiber **fibers = 0; + { + tm_lock(&wait_addr_bin->lock); + { + /* Search for wait addr list */ + for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { + if (tmp->value == (u64)addr) { + wait_addr_list = tmp; + } + } + + /* Lock & build fibers array */ + if (wait_addr_list) { + fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_addr_list->num_waiters); + for (struct fiber *fiber = fiber_from_id(wait_addr_list->first_waiter); fiber && num_fibers < count; fiber = fiber_from_id(fiber->next_addr_waiter)) { + if (atomic_i32_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) { + fibers[num_fibers] = fiber; + ++num_fibers; + } + } + } + } + tm_unlock(&wait_addr_bin->lock); + } + + wake_fibers_locked(num_fibers, fibers); + + /* Wake win32 blocking thread waiters */ if (count >= WAKE_ALL_THRESHOLD) { WakeByAddressAll(addr); } else { @@ -597,20 +627,55 @@ void sys_wake(void *addr, i32 count) } } - /* Wake workers */ - /* TODO: Only wake necessary amount of workers */ - if (num_waiters > 0) { - struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); + scratch_end(scratch); +} + +INTERNAL void wake_time(u64 time) +{ + struct arena_temp scratch = scratch_begin_no_conflict(); + + u64 wait_time_bin_index = (u64)time % NUM_WAIT_TIME_BINS; + struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index]; + struct wait_list *wait_time_list = 0; + + /* Build list of waiters to resume */ + i32 num_fibers = 0; + struct fiber **fibers = 0; + { + tm_lock(&wait_time_bin->lock); { - atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters); - snc_cv_signal(&G.workers_wake_cv, num_waiters); + /* Search for wait time list */ + for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { + if (tmp->value == (u64)time) { + wait_time_list = tmp; + } + } + + if (wait_time_list) { + /* Set waiter wake status & build fibers list */ + fibers = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_time_list->num_waiters); + for (struct fiber *fiber = fiber_from_id(wait_time_list->first_waiter); fiber; fiber = fiber_from_id(fiber->next_time_waiter)) { + if (atomic_i32_fetch_test_set(&fiber->wake_lock, 0, 1) == 0) { + fibers[num_fibers] = fiber; + ++num_fibers; + } + + } + } } - snc_unlock(&lock); + tm_unlock(&wait_time_bin->lock); } + wake_fibers_locked(num_fibers, fibers); + scratch_end(scratch); } +void sys_wake(void *addr, i32 count) +{ + wake_address(addr, count); +} + /* ========================== * * Fibers * ========================== */ @@ -765,7 +830,6 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit tm_unlock(&queue->lock); } /* Wake workers */ - /* TODO: Only wake necessary amount of workers */ struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); { atomic_i64_fetch_add(&G.num_jobs_in_queue.v, count); @@ -1150,8 +1214,6 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) ASSERT(success); } - struct arena_temp scratch = scratch_begin_no_conflict(); - /* Create high resolution timer */ HANDLE timer = CreateWaitableTimerExW(0, 0, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); if (!timer) { @@ -1180,14 +1242,17 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) WaitForSingleObject(timer, INFINITE); } + /* Calculate mean period */ i64 now_ns = sys_time_ns(); - if (last_cycle_ns != 0) { - i64 new_period_ns = now_ns - last_cycle_ns; - periods[periods_index++] = new_period_ns; + i64 period_ns = last_cycle_ns == 0 ? DEFAULT_SCHEDULER_CYCLE_PERIOD_NS : now_ns - last_cycle_ns; + last_cycle_ns = now_ns; + + /* Calculate mean period */ + { + periods[periods_index++] = period_ns; if (periods_index == countof(periods)) { periods_index = 0; } - /* Calculate mean period */ f64 periods_sum_ns = 0; for (i32 i = 0; i < (i32)countof(periods); ++i) { periods_sum_ns += (f64)periods[i]; @@ -1195,192 +1260,13 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) f64 mean_ns = periods_sum_ns / (f64)countof(periods); atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns.v, math_round_to_int64(mean_ns)); } - last_cycle_ns = now_ns; - u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1); - i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle.v, 1) + 1; { __profn("Job scheduler run"); - struct arena_temp temp = arena_temp_begin(scratch.arena); - u64 wait_time_bin_index = (u64)current_cycle % NUM_WAIT_TIME_BINS; - struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index]; - struct wait_list *wait_time_list = 0; - - /* Build list of waiters to resume */ - i32 num_waiters = 0; - struct fiber **waiters = 0; - { - tm_lock(&wait_time_bin->lock); - { - /* Search for wait time list */ - for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)current_cycle) { - wait_time_list = tmp; - } - } - - if (wait_time_list) { - /* Set waiter wake status & build waiters list */ - waiters = arena_push_array_no_zero(temp.arena, struct fiber *, wait_time_list->num_waiters); - for (struct fiber *waiter = fiber_from_id(wait_time_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_time_waiter)) { - if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) { - waiters[num_waiters] = waiter; - ++num_waiters; - } - - } - } - } - tm_unlock(&wait_time_bin->lock); - } - - /* Update wait lists */ - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - u64 wait_addr = waiter->wait_addr; - u64 wait_addr_bin_index = wait_addr % NUM_WAIT_ADDR_BINS; - struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index]; - - if (wait_addr != 0) tm_lock(&wait_addr_bin->lock); - { - /* Search for wait addr list */ - struct wait_list *wait_addr_list = 0; - if (wait_addr != 0) { - for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)wait_addr) { - wait_addr_list = tmp; - } - } - } - - tm_lock(&wait_time_bin->lock); - { - /* Remove from addr list */ - if (wait_addr_list) { - if (--wait_addr_list->num_waiters == 0) { - /* Free addr list */ - struct wait_list *prev = wait_addr_list->prev_in_bin; - struct wait_list *next = wait_addr_list->next_in_bin; - if (prev) { - prev->next_in_bin = next; - } else { - wait_addr_bin->first_wait_list = next; - } - if (next) { - next->next_in_bin = prev; - } else { - wait_addr_bin->last_wait_list = prev; - } - wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list; - wait_addr_bin->first_free_wait_list = wait_addr_list; - } else { - i16 prev_id = waiter->prev_addr_waiter; - i16 next_id = waiter->next_addr_waiter; - if (prev_id) { - fiber_from_id(prev_id)->next_addr_waiter = next_id; - } else { - wait_addr_list->first_waiter = next_id; - } - if (next_id) { - fiber_from_id(next_id)->prev_addr_waiter = prev_id; - } else { - wait_addr_list->last_waiter = prev_id; - } - } - waiter->wait_addr = 0; - waiter->prev_addr_waiter = 0; - waiter->next_addr_waiter = 0; - } - /* Remove from time list */ - { - if (--wait_time_list->num_waiters == 0) { - /* Free time list */ - struct wait_list *prev = wait_time_list->prev_in_bin; - struct wait_list *next = wait_time_list->next_in_bin; - if (prev) { - prev->next_in_bin = next; - } else { - wait_time_bin->first_wait_list = next; - } - if (next) { - next->next_in_bin = prev; - } else { - wait_time_bin->last_wait_list = prev; - } - wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list; - wait_time_bin->first_free_wait_list = wait_time_list; - } else { - i16 prev_id = waiter->prev_time_waiter; - i16 next_id = waiter->next_time_waiter; - if (prev_id) { - fiber_from_id(prev_id)->next_time_waiter = next_id; - } else { - wait_time_list->first_waiter = next_id; - } - if (next_id) { - fiber_from_id(next_id)->prev_time_waiter = prev_id; - } else { - wait_time_list->last_waiter = prev_id; - } - } - waiter->wait_time = 0; - waiter->prev_time_waiter = 0; - waiter->next_time_waiter = 0; - } - } - tm_unlock(&wait_time_bin->lock); - } - if (wait_addr != 0) tm_unlock(&wait_addr_bin->lock); - } - - /* Resume waiters */ - /* TODO: Batch submit waiters based on queue kind rather than one at a time */ - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority); - struct job_queue *queue = &G.job_queues[queue_kind]; - tm_lock(&queue->lock); - { - struct job_info *info = 0; - if (queue->first_free) { - info = queue->first_free; - queue->first_free = info->next; - } else { - info = arena_push_no_zero(queue->arena, struct job_info); - } - MEMZERO_STRUCT(info); - info->count = 1; - info->num_dispatched = waiter->job_id; - info->func = waiter->job_func; - info->sig = waiter->job_sig; - info->counter = waiter->job_counter; - info->fiber_id = waiter->id; - if (queue->last) { - queue->last->next = info; - } else { - queue->first = info; - } - queue->last = info; - atomic_u64_fetch_set(&waiter->wake_gen, 0); - } - tm_unlock(&queue->lock); - } - - /* Wake workers */ - /* TODO: Only wake necessary amount of workers */ - if (num_waiters > 0) { - struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); - { - atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters); - snc_cv_signal(&G.workers_wake_cv, num_waiters); - } - snc_unlock(&lock); - } - arena_temp_end(temp); + i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle.v, 1) + 1; + wake_time((u64)current_cycle); } } - - scratch_end(scratch); } /* ========================== *