add scheduler thread for waking yields with timeout
This commit is contained in:
parent
e878a2c96f
commit
cc51fe29a7
@ -131,14 +131,11 @@ void snc_cv_wait_time(struct snc_cv *cv, struct snc_lock *l, i64 timeout_ns)
|
|||||||
u64 old_wake_gen = atomic_u64_fetch(&cv->wake_gen);
|
u64 old_wake_gen = atomic_u64_fetch(&cv->wake_gen);
|
||||||
struct snc_mutex *mutex = l->mutex;
|
struct snc_mutex *mutex = l->mutex;
|
||||||
b32 exclusive = l->exclusive;
|
b32 exclusive = l->exclusive;
|
||||||
u64 wake_gen = old_wake_gen;
|
|
||||||
{
|
{
|
||||||
snc_unlock(l);
|
snc_unlock(l);
|
||||||
do {
|
{
|
||||||
sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), timeout_ns);
|
sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), timeout_ns);
|
||||||
wake_gen = atomic_u64_fetch(&cv->wake_gen);
|
}
|
||||||
} while (wake_gen == old_wake_gen);
|
|
||||||
sys_wake_all(&cv->wake_gen);
|
|
||||||
if (exclusive) {
|
if (exclusive) {
|
||||||
*l = snc_lock_e(mutex);
|
*l = snc_lock_e(mutex);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
68
src/sprite.c
68
src/sprite.c
@ -22,7 +22,7 @@ STATIC_ASSERT(CACHE_MEMORY_BUDGET_THRESHOLD >= CACHE_MEMORY_BUDGET_TARGET);
|
|||||||
|
|
||||||
#define MAX_SCOPE_REFERENCES 1024
|
#define MAX_SCOPE_REFERENCES 1024
|
||||||
|
|
||||||
/* How long between evictor thread scans */
|
/* How long between evictor cycles */
|
||||||
#define EVICTOR_CYCLE_INTERVAL_NS NS_FROM_SECONDS(0.500)
|
#define EVICTOR_CYCLE_INTERVAL_NS NS_FROM_SECONDS(0.500)
|
||||||
|
|
||||||
/* How many cycles a cache entry spends unused until it's considered evictable */
|
/* How many cycles a cache entry spends unused until it's considered evictable */
|
||||||
@ -150,12 +150,11 @@ GLOBAL struct {
|
|||||||
struct arena *scopes_arena;
|
struct arena *scopes_arena;
|
||||||
struct sprite_scope *first_free_scope;
|
struct sprite_scope *first_free_scope;
|
||||||
|
|
||||||
/* Evictor thread */
|
/* Evictor */
|
||||||
struct atomic_i32 evictor_cycle;
|
struct atomic_i32 evictor_cycle;
|
||||||
b32 evictor_scheduler_shutdown;
|
b32 evictor_scheduler_shutdown;
|
||||||
struct snc_mutex evictor_scheduler_mutex;
|
struct snc_mutex evictor_scheduler_mutex;
|
||||||
struct snc_cv evictor_scheduler_shutdown_cv;
|
struct snc_cv evictor_scheduler_shutdown_cv;
|
||||||
struct sys_thread *evictor_scheduler_thread;
|
|
||||||
} G = ZI, DEBUG_ALIAS(G, G_sprite);
|
} G = ZI, DEBUG_ALIAS(G, G_sprite);
|
||||||
|
|
||||||
/* ========================== *
|
/* ========================== *
|
||||||
@ -202,7 +201,7 @@ INTERNAL struct image_rgba generate_purple_black_image(struct arena *arena, u32
|
|||||||
|
|
||||||
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(sprite_shutdown);
|
INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(sprite_shutdown);
|
||||||
INTERNAL SYS_JOB_DEF(sprite_load_job, arg);
|
INTERNAL SYS_JOB_DEF(sprite_load_job, arg);
|
||||||
INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg);
|
INTERNAL SYS_JOB_DEF(sprite_evictor_job, _);
|
||||||
|
|
||||||
#if RESOURCE_RELOADING
|
#if RESOURCE_RELOADING
|
||||||
INTERNAL RESOURCE_WATCH_CALLBACK_FUNC_DEF(sprite_resource_watch_callback, info);
|
INTERNAL RESOURCE_WATCH_CALLBACK_FUNC_DEF(sprite_resource_watch_callback, info);
|
||||||
@ -249,7 +248,7 @@ struct sprite_startup_receipt sprite_startup(struct gp_startup_receipt *gp_sr,
|
|||||||
|
|
||||||
G.scopes_arena = arena_alloc(GIBI(64));
|
G.scopes_arena = arena_alloc(GIBI(64));
|
||||||
|
|
||||||
G.evictor_scheduler_thread = sys_thread_alloc(sprite_evictor_scheduler_thread_entry_point, NULL, LIT("Sprite evictor scheduler"), PROF_THREAD_GROUP_EVICTORS);
|
sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, NULL);
|
||||||
|
|
||||||
app_register_exit_callback(&sprite_shutdown);
|
app_register_exit_callback(&sprite_shutdown);
|
||||||
resource_register_watch_callback(&sprite_resource_watch_callback);
|
resource_register_watch_callback(&sprite_resource_watch_callback);
|
||||||
@ -267,7 +266,6 @@ INTERNAL APP_EXIT_CALLBACK_FUNC_DEF(sprite_shutdown)
|
|||||||
snc_cv_broadcast(&G.evictor_scheduler_shutdown_cv);
|
snc_cv_broadcast(&G.evictor_scheduler_shutdown_cv);
|
||||||
snc_unlock(&lock);
|
snc_unlock(&lock);
|
||||||
}
|
}
|
||||||
sys_thread_wait_release(G.evictor_scheduler_thread);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ========================== *
|
/* ========================== *
|
||||||
@ -1220,12 +1218,25 @@ INTERNAL SORT_COMPARE_FUNC_DEF(evict_sort, arg_a, arg_b, udata)
|
|||||||
return (b_cycle > a_cycle) - (a_cycle > b_cycle);
|
return (b_cycle > a_cycle) - (a_cycle > b_cycle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* NOTE:
|
||||||
|
* A cache node is safe from eviction as long as:
|
||||||
|
* - Its bin mutex is locked
|
||||||
|
* - Any references are held to the node (its refcount > 0)
|
||||||
|
*
|
||||||
|
* An attempt to evict a cache node will occur when:
|
||||||
|
* - Its refcount = 0 and
|
||||||
|
* - The cache is over its memory budget and the node's last reference is longer ago than the grace period
|
||||||
|
* - Resource reloading is enabled and the node is out of date due to a change to its original resource file
|
||||||
|
*/
|
||||||
INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
|
INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
|
||||||
{
|
{
|
||||||
(UNUSED)_;
|
(UNUSED)_;
|
||||||
__prof;
|
b32 shutdown = false;
|
||||||
struct arena_temp scratch = scratch_begin_no_conflict();
|
while (!shutdown) {
|
||||||
{
|
{
|
||||||
|
__profn("Sprite evictor cycle");
|
||||||
|
struct arena_temp scratch = scratch_begin_no_conflict();
|
||||||
u64 evict_array_count = 0;
|
u64 evict_array_count = 0;
|
||||||
struct evict_node *evict_array = arena_push_dry(scratch.arena, struct evict_node);
|
struct evict_node *evict_array = arena_push_dry(scratch.arena, struct evict_node);
|
||||||
{
|
{
|
||||||
@ -1355,34 +1366,17 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _)
|
|||||||
scratch_end(scratch);
|
scratch_end(scratch);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
/* Evictor sleep */
|
||||||
|
{
|
||||||
/* NOTE:
|
__profn("Sprite evictor wait");
|
||||||
* A cache node is safe from eviction as long as:
|
struct snc_lock lock = snc_lock_e(&G.evictor_scheduler_mutex);
|
||||||
* - Its bin mutex is locked
|
{
|
||||||
* - Any references are held to the node (its refcount > 0)
|
if (!G.evictor_scheduler_shutdown) {
|
||||||
*
|
snc_cv_wait_time(&G.evictor_scheduler_shutdown_cv, &lock, EVICTOR_CYCLE_INTERVAL_NS);
|
||||||
* An attempt to evict a cache node will occur when:
|
}
|
||||||
* - Its refcount = 0 and
|
shutdown = G.evictor_scheduler_shutdown;
|
||||||
* - The cache is over its memory budget and the node's last reference is longer ago than the grace period
|
}
|
||||||
* - Resource reloading is enabled and the node is out of date due to a change to its original resource file
|
snc_unlock(&lock);
|
||||||
*/
|
}
|
||||||
INTERNAL SYS_THREAD_DEF(sprite_evictor_scheduler_thread_entry_point, arg)
|
|
||||||
{
|
|
||||||
(UNUSED)arg;
|
|
||||||
struct snc_lock evictor_lock = snc_lock_e(&G.evictor_scheduler_mutex);
|
|
||||||
while (!G.evictor_scheduler_shutdown) {
|
|
||||||
struct snc_counter counter = ZI;
|
|
||||||
sys_run(1, sprite_evictor_job, NULL, SYS_PRIORITY_BACKGROUND, &counter);
|
|
||||||
snc_counter_wait(&counter);
|
|
||||||
/* FIXME: Enable this */
|
|
||||||
#if 0
|
|
||||||
snc_cv_wait_time(G.evictor_scheduler_shutdown_cv, &evictor_lock, SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS));
|
|
||||||
#else
|
|
||||||
snc_unlock(&evictor_lock);
|
|
||||||
sys_sleep(SECONDS_FROM_NS(EVICTOR_CYCLE_INTERVAL_NS));
|
|
||||||
evictor_lock = snc_lock_e(&G.evictor_scheduler_mutex);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
snc_unlock(&evictor_lock);
|
|
||||||
}
|
}
|
||||||
|
|||||||
641
src/sys_win32.c
641
src/sys_win32.c
@ -102,6 +102,14 @@ struct win32_window {
|
|||||||
|
|
||||||
|
|
||||||
#define NUM_WAIT_ADDR_BINS 65536
|
#define NUM_WAIT_ADDR_BINS 65536
|
||||||
|
#define NUM_WAIT_TIME_BINS 1024
|
||||||
|
|
||||||
|
/* Defines the resolution of the scheduler interval.
|
||||||
|
*
|
||||||
|
* NOTE: This is not the actual rate that the scheduler runs at, just the
|
||||||
|
* minimum amount of time that it can refer to. Smaller values mean that the
|
||||||
|
* scheduler has to process a greater number of wait lists upon waking up. */
|
||||||
|
#define SCHEDULER_MIN_INTERVAL_NS (KIBI(256)) /* ~256 microseconds */
|
||||||
|
|
||||||
struct alignas(64) wait_list {
|
struct alignas(64) wait_list {
|
||||||
/* =================================================== */
|
/* =================================================== */
|
||||||
@ -176,6 +184,8 @@ struct alignas(64) fiber {
|
|||||||
i16 parent_id; /* 02 bytes */
|
i16 parent_id; /* 02 bytes */
|
||||||
u8 _pad0[4]; /* 04 bytes (padding) */
|
u8 _pad0[4]; /* 04 bytes (padding) */
|
||||||
/* ==================================================== */
|
/* ==================================================== */
|
||||||
|
struct atomic_u64 wake_gen; /* 08 bytes */
|
||||||
|
/* ==================================================== */
|
||||||
u64 wait_addr; /* 08 bytes */
|
u64 wait_addr; /* 08 bytes */
|
||||||
/* ==================================================== */
|
/* ==================================================== */
|
||||||
u64 wait_time; /* 08 bytes */
|
u64 wait_time; /* 08 bytes */
|
||||||
@ -187,8 +197,6 @@ struct alignas(64) fiber {
|
|||||||
/* ==================================================== */
|
/* ==================================================== */
|
||||||
u8 _pad1[8]; /* 08 bytes (padding) */
|
u8 _pad1[8]; /* 08 bytes (padding) */
|
||||||
/* ==================================================== */
|
/* ==================================================== */
|
||||||
u8 _pad2[8]; /* 08 bytes (padding) */
|
|
||||||
/* ==================================================== */
|
|
||||||
|
|
||||||
/* ==================================================== */
|
/* ==================================================== */
|
||||||
/* ==================== Cache line ==================== */
|
/* ==================== Cache line ==================== */
|
||||||
@ -208,7 +216,7 @@ struct alignas(64) fiber {
|
|||||||
/* ==================================================== */
|
/* ==================================================== */
|
||||||
struct yield_param *yield_param; /* 08 bytes */
|
struct yield_param *yield_param; /* 08 bytes */
|
||||||
/* ==================================================== */
|
/* ==================================================== */
|
||||||
u8 _pad3[8]; /* 08 bytes (padding) */
|
u8 _pad2[8]; /* 08 bytes (padding) */
|
||||||
|
|
||||||
};
|
};
|
||||||
STATIC_ASSERT(sizeof(struct fiber) == 128); /* Padding validation (increase if necessary) */
|
STATIC_ASSERT(sizeof(struct fiber) == 128); /* Padding validation (increase if necessary) */
|
||||||
@ -294,12 +302,21 @@ GLOBAL struct {
|
|||||||
struct arena *windows_arena;
|
struct arena *windows_arena;
|
||||||
struct win32_window *first_free_window;
|
struct win32_window *first_free_window;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* Scheduler */
|
||||||
|
struct atomic_i64 current_scheduler_interval; /* TODO: Prevent false sharing */
|
||||||
|
|
||||||
/* Wait lists */
|
/* Wait lists */
|
||||||
|
struct atomic_u64 waiter_wake_gen; /* TODO: Prevent false sharing */
|
||||||
struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */
|
struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */
|
||||||
struct arena *wait_lists_arena;
|
struct arena *wait_lists_arena;
|
||||||
|
|
||||||
/* Wait table */
|
/* Wait tables */
|
||||||
struct wait_bin wait_addr_bins[NUM_WAIT_ADDR_BINS];
|
struct wait_bin wait_addr_bins[NUM_WAIT_ADDR_BINS];
|
||||||
|
struct wait_bin wait_time_bins[NUM_WAIT_TIME_BINS];
|
||||||
|
|
||||||
/* Fibers */
|
/* Fibers */
|
||||||
i16 num_fibers;
|
i16 num_fibers;
|
||||||
@ -378,6 +395,21 @@ void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void sys_wake_single(void *addr)
|
void sys_wake_single(void *addr)
|
||||||
{
|
{
|
||||||
/* FIXME: Real wake single */
|
/* FIXME: Real wake single */
|
||||||
@ -386,46 +418,142 @@ void sys_wake_single(void *addr)
|
|||||||
|
|
||||||
void sys_wake_all(void *addr)
|
void sys_wake_all(void *addr)
|
||||||
{
|
{
|
||||||
u64 wait_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS;
|
|
||||||
struct wait_bin *bin = &G.wait_addr_bins[wait_bin_index];
|
|
||||||
|
|
||||||
i32 num_waiters = 0;
|
|
||||||
while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
|
|
||||||
{
|
|
||||||
struct wait_list *wait_list = NULL;
|
|
||||||
for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) {
|
|
||||||
if (tmp->value == (u64)addr) {
|
|
||||||
wait_list = tmp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (wait_list && wait_list->num_waiters > 0) {
|
|
||||||
num_waiters = wait_list->num_waiters;
|
|
||||||
struct arena_temp scratch = scratch_begin_no_conflict();
|
struct arena_temp scratch = scratch_begin_no_conflict();
|
||||||
|
|
||||||
|
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1);
|
||||||
|
|
||||||
|
u64 wait_addr_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS;
|
||||||
|
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
|
||||||
|
struct wait_list *wait_addr_list = NULL;
|
||||||
|
|
||||||
|
/* Get list of waiters */
|
||||||
|
i32 num_waiters = 0;
|
||||||
|
struct fiber **waiters = NULL;
|
||||||
{
|
{
|
||||||
/* Separate waiters by queue kind */
|
while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
i32 queue_waiter_counts[NUM_JOB_QUEUE_KINDS] = ZI;
|
{
|
||||||
struct fiber **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI;
|
/* Search for wait addr list */
|
||||||
for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) {
|
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
|
||||||
/* NOTE: Each array is conservatively sized as the number of all waiters in the list */
|
if (tmp->value == (u64)addr) {
|
||||||
queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct fiber *, num_waiters);
|
wait_addr_list = tmp;
|
||||||
}
|
}
|
||||||
for (struct fiber *waiter = fiber_from_id(wait_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_addr_waiter)) {
|
}
|
||||||
|
|
||||||
|
if (wait_addr_list) {
|
||||||
|
/* Build waiters array */
|
||||||
|
waiters = arena_push_array_no_zero(scratch.arena, struct fiber *, wait_addr_list->num_waiters);
|
||||||
|
for (struct fiber *waiter = fiber_from_id(wait_addr_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_addr_waiter)) {
|
||||||
|
if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) {
|
||||||
|
waiters[num_waiters] = waiter;
|
||||||
|
++num_waiters;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic_i32_fetch_set(&wait_addr_bin->lock, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i32 i = 0; i < num_waiters; ++i) {
|
||||||
|
struct fiber *waiter = waiters[i];
|
||||||
|
u64 wait_time = waiter->wait_time;
|
||||||
|
u64 wait_time_bin_index = wait_time % NUM_WAIT_TIME_BINS;
|
||||||
|
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
|
||||||
|
|
||||||
|
if (wait_time != 0) while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
/* Search for wait time list */
|
||||||
|
struct wait_list *wait_time_list = NULL;
|
||||||
|
if (wait_time != 0) {
|
||||||
|
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
|
||||||
|
if (tmp->value == (u64)wait_time) {
|
||||||
|
wait_time_list = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* Remove from addr list */
|
||||||
|
{
|
||||||
|
if (--wait_addr_list->num_waiters == 0) {
|
||||||
|
/* Free wait addr list */
|
||||||
|
struct wait_list *prev = wait_addr_list->prev_in_bin;
|
||||||
|
struct wait_list *next = wait_addr_list->next_in_bin;
|
||||||
|
if (prev) {
|
||||||
|
prev->next_in_bin = next;
|
||||||
|
} else {
|
||||||
|
wait_addr_bin->first_wait_list = next;
|
||||||
|
}
|
||||||
|
if (next) {
|
||||||
|
next->next_in_bin = prev;
|
||||||
|
} else {
|
||||||
|
wait_addr_bin->last_wait_list = prev;
|
||||||
|
}
|
||||||
|
wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list;
|
||||||
|
wait_addr_bin->first_free_wait_list = wait_addr_list;
|
||||||
|
} else {
|
||||||
|
i16 prev_id = waiter->prev_addr_waiter;
|
||||||
|
i16 next_id = waiter->next_addr_waiter;
|
||||||
|
if (prev_id) {
|
||||||
|
fiber_from_id(prev_id)->next_addr_waiter = next_id;
|
||||||
|
} else {
|
||||||
|
wait_addr_list->first_waiter = next_id;
|
||||||
|
}
|
||||||
|
if (next_id) {
|
||||||
|
fiber_from_id(next_id)->prev_addr_waiter = prev_id;
|
||||||
|
} else {
|
||||||
|
wait_addr_list->last_waiter = prev_id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waiter->wait_addr = 0;
|
||||||
|
waiter->prev_addr_waiter = 0;
|
||||||
|
waiter->next_addr_waiter = 0;
|
||||||
|
}
|
||||||
|
/* Remove from time list */
|
||||||
|
if (wait_time_list) {
|
||||||
|
if (--wait_time_list->num_waiters == 0) {
|
||||||
|
/* Free wait time list */
|
||||||
|
struct wait_list *prev = wait_time_list->prev_in_bin;
|
||||||
|
struct wait_list *next = wait_time_list->next_in_bin;
|
||||||
|
if (prev) {
|
||||||
|
prev->next_in_bin = next;
|
||||||
|
} else {
|
||||||
|
wait_time_bin->first_wait_list = next;
|
||||||
|
}
|
||||||
|
if (next) {
|
||||||
|
next->next_in_bin = prev;
|
||||||
|
} else {
|
||||||
|
wait_time_bin->last_wait_list = prev;
|
||||||
|
}
|
||||||
|
wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list;
|
||||||
|
wait_time_bin->first_free_wait_list = wait_time_list;
|
||||||
|
} else {
|
||||||
|
i16 prev_id = waiter->prev_time_waiter;
|
||||||
|
i16 next_id = waiter->next_time_waiter;
|
||||||
|
if (prev_id) {
|
||||||
|
fiber_from_id(prev_id)->next_time_waiter = next_id;
|
||||||
|
} else {
|
||||||
|
wait_time_list->first_waiter = next_id;
|
||||||
|
}
|
||||||
|
if (next_id) {
|
||||||
|
fiber_from_id(next_id)->prev_time_waiter = prev_id;
|
||||||
|
} else {
|
||||||
|
wait_time_list->last_waiter = prev_id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waiter->wait_time = 0;
|
||||||
|
waiter->prev_time_waiter = 0;
|
||||||
|
waiter->next_time_waiter = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (wait_time != 0) atomic_i32_fetch_set(&wait_time_bin->lock, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Resume waiters */
|
||||||
|
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
|
||||||
|
for (i32 i = 0; i < num_waiters; ++i) {
|
||||||
|
struct fiber *waiter = waiters[i];
|
||||||
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
|
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
|
||||||
i32 index = queue_waiter_counts[queue_kind]++;
|
|
||||||
struct fiber **array = queue_waiter_arrays[queue_kind];
|
|
||||||
array[index] = waiter;
|
|
||||||
}
|
|
||||||
/* Push jobs */
|
|
||||||
for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_waiter_counts); ++queue_kind) {
|
|
||||||
i32 queue_num_waiters = queue_waiter_counts[queue_kind];
|
|
||||||
if (queue_num_waiters > 0) {
|
|
||||||
struct job_queue *queue = &G.job_queues[queue_kind];
|
struct job_queue *queue = &G.job_queues[queue_kind];
|
||||||
struct fiber **queue_waiters = queue_waiter_arrays[queue_kind];
|
|
||||||
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
|
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
|
||||||
{
|
{
|
||||||
/* TODO: More efficient batch job list allocation */
|
|
||||||
for (i32 i = 0; i < queue_num_waiters; ++i) {
|
|
||||||
struct fiber *waiter = queue_waiters[i];
|
|
||||||
struct job_info *info = NULL;
|
struct job_info *info = NULL;
|
||||||
if (queue->first_free) {
|
if (queue->first_free) {
|
||||||
info = queue->first_free;
|
info = queue->first_free;
|
||||||
@ -446,33 +574,10 @@ void sys_wake_all(void *addr)
|
|||||||
queue->first = info;
|
queue->first = info;
|
||||||
}
|
}
|
||||||
queue->last = info;
|
queue->last = info;
|
||||||
}
|
atomic_u64_fetch_set(&waiter->wake_gen, 0);
|
||||||
}
|
}
|
||||||
atomic_i32_fetch_set(&queue->lock, 0);
|
atomic_i32_fetch_set(&queue->lock, 0);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
/* Free wait list */
|
|
||||||
{
|
|
||||||
struct wait_list *prev = wait_list->prev_in_bin;
|
|
||||||
struct wait_list *next = wait_list->next_in_bin;
|
|
||||||
if (prev) {
|
|
||||||
prev->next_in_bin = next;
|
|
||||||
} else {
|
|
||||||
bin->first_wait_list = next;
|
|
||||||
}
|
|
||||||
if (next) {
|
|
||||||
next->prev_in_bin = prev;
|
|
||||||
} else {
|
|
||||||
bin->last_wait_list = prev;
|
|
||||||
}
|
|
||||||
wait_list->next_in_bin = bin->first_free_wait_list;
|
|
||||||
bin->first_free_wait_list = wait_list;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
scratch_end(scratch);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
atomic_i32_fetch_set(&bin->lock, 0);
|
|
||||||
|
|
||||||
/* Wake blocking waiters */
|
/* Wake blocking waiters */
|
||||||
WakeByAddressAll(addr);
|
WakeByAddressAll(addr);
|
||||||
@ -489,6 +594,8 @@ void sys_wake_all(void *addr)
|
|||||||
}
|
}
|
||||||
snc_unlock(&lock);
|
snc_unlock(&lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scratch_end(scratch);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ========================== *
|
/* ========================== *
|
||||||
@ -567,8 +674,8 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind)
|
|||||||
fiber->addr = ConvertThreadToFiber((void *)(i64)fiber_id);
|
fiber->addr = ConvertThreadToFiber((void *)(i64)fiber_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fiber->wait_addr = 0;
|
MEMZERO_STRUCT(&fiber->wait_addr);
|
||||||
fiber->wait_time = 0;
|
MEMZERO_STRUCT(&fiber->wait_time);
|
||||||
fiber->prev_addr_waiter = 0;
|
fiber->prev_addr_waiter = 0;
|
||||||
fiber->next_addr_waiter = 0;
|
fiber->next_addr_waiter = 0;
|
||||||
fiber->prev_time_waiter = 0;
|
fiber->prev_time_waiter = 0;
|
||||||
@ -760,12 +867,8 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct fiber *job_fiber = NULL;
|
struct fiber *job_fiber = NULL;
|
||||||
|
i64 last_seen_wake_gen = 0;
|
||||||
struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex);
|
|
||||||
i64 last_seen_wake_gen = atomic_i64_fetch(&G.workers_wake_gen);
|
|
||||||
while (last_seen_wake_gen >= 0) {
|
while (last_seen_wake_gen >= 0) {
|
||||||
snc_unlock(&wake_lock);
|
|
||||||
|
|
||||||
/* Pull job from queue */
|
/* Pull job from queue */
|
||||||
b32 queues_empty = true;
|
b32 queues_empty = true;
|
||||||
enum sys_priority job_priority = 0;
|
enum sys_priority job_priority = 0;
|
||||||
@ -842,8 +945,8 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
|
|||||||
if (job_func) {
|
if (job_func) {
|
||||||
if (!job_fiber) {
|
if (!job_fiber) {
|
||||||
job_fiber = fiber_alloc(FIBER_KIND_JOB_WORKER);
|
job_fiber = fiber_alloc(FIBER_KIND_JOB_WORKER);
|
||||||
job_fiber_id = job_fiber->id;
|
|
||||||
}
|
}
|
||||||
|
job_fiber_id = job_fiber->id;
|
||||||
{
|
{
|
||||||
__profnc("Run fiber", RGB32_F(0.25, 0.75, 0));
|
__profnc("Run fiber", RGB32_F(0.25, 0.75, 0));
|
||||||
__profvalue(job_fiber->id);
|
__profvalue(job_fiber->id);
|
||||||
@ -872,11 +975,136 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
|
|||||||
volatile void *wait_addr = yield.wait.addr;
|
volatile void *wait_addr = yield.wait.addr;
|
||||||
void *wait_cmp = yield.wait.cmp;
|
void *wait_cmp = yield.wait.cmp;
|
||||||
u32 wait_size = yield.wait.size;
|
u32 wait_size = yield.wait.size;
|
||||||
|
i64 wait_timeout_ns = yield.wait.timeout_ns;
|
||||||
|
i64 wait_time = 0;
|
||||||
|
if (wait_timeout_ns > 0 && wait_timeout_ns < I64_MAX) {
|
||||||
|
wait_time = atomic_i64_fetch(&G.current_scheduler_interval) + (wait_timeout_ns / SCHEDULER_MIN_INTERVAL_NS);
|
||||||
|
}
|
||||||
|
|
||||||
u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_ADDR_BINS;
|
u64 wait_addr_bin_index = (u64)wait_addr % NUM_WAIT_ADDR_BINS;
|
||||||
struct wait_bin *bin = &G.wait_addr_bins[wait_bin_index];
|
u64 wait_time_bin_index = (u64)wait_time % NUM_WAIT_TIME_BINS;
|
||||||
|
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
|
||||||
|
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
|
||||||
|
|
||||||
while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause();
|
if (wait_addr != 0) while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
if (wait_time != 0) while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
b32 cancel_wait = true;
|
||||||
|
if (wait_addr != 0) {
|
||||||
|
switch (wait_size) {
|
||||||
|
case 1: cancel_wait = (u8)_InterlockedCompareExchange8(wait_addr, 0, 0) != *(u8 *)wait_cmp; break;
|
||||||
|
case 2: cancel_wait = (u16)_InterlockedCompareExchange16(wait_addr, 0, 0) != *(u16 *)wait_cmp; break;
|
||||||
|
case 4: cancel_wait = (u32)_InterlockedCompareExchange(wait_addr, 0, 0) != *(u32 *)wait_cmp; break;
|
||||||
|
case 8: cancel_wait = (u64)_InterlockedCompareExchange64(wait_addr, 0, 0) != *(u64 *)wait_cmp; break;
|
||||||
|
default: cancel_wait = true; ASSERT(false); break; /* Invalid wait size */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (wait_time != 0 && !cancel_wait) {
|
||||||
|
cancel_wait = atomic_i64_fetch(&G.current_scheduler_interval) > wait_time;
|
||||||
|
}
|
||||||
|
if (!cancel_wait) {
|
||||||
|
if (wait_addr != 0) {
|
||||||
|
/* Search for wait addr list in bin */
|
||||||
|
struct wait_list *wait_addr_list = NULL;
|
||||||
|
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
|
||||||
|
if (tmp->value == (u64)wait_addr) {
|
||||||
|
wait_addr_list = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* Allocate new wait addr list */
|
||||||
|
if (!wait_addr_list) {
|
||||||
|
if (wait_addr_bin->first_free_wait_list) {
|
||||||
|
wait_addr_list = wait_addr_bin->first_free_wait_list;
|
||||||
|
wait_addr_bin->first_free_wait_list = wait_addr_list->next_in_bin;
|
||||||
|
} else {
|
||||||
|
while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
wait_addr_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list);
|
||||||
|
}
|
||||||
|
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
|
||||||
|
}
|
||||||
|
MEMZERO_STRUCT(wait_addr_list);
|
||||||
|
wait_addr_list->value = wait_addr;
|
||||||
|
if (wait_addr_bin->last_wait_list) {
|
||||||
|
wait_addr_bin->last_wait_list->next_in_bin = wait_addr_list;
|
||||||
|
wait_addr_list->prev_in_bin = wait_addr_bin->last_wait_list;
|
||||||
|
} else {
|
||||||
|
wait_addr_bin->first_wait_list = wait_addr_list;
|
||||||
|
}
|
||||||
|
wait_addr_bin->last_wait_list = wait_addr_list;
|
||||||
|
}
|
||||||
|
/* Insert fiber into wait addr list */
|
||||||
|
job_fiber->wait_addr = wait_addr;
|
||||||
|
if (wait_addr_list->last_waiter) {
|
||||||
|
fiber_from_id(wait_addr_list->last_waiter)->next_addr_waiter = job_fiber_id;
|
||||||
|
job_fiber->prev_addr_waiter = wait_addr_list->last_waiter;
|
||||||
|
} else {
|
||||||
|
wait_addr_list->first_waiter = job_fiber_id;
|
||||||
|
}
|
||||||
|
wait_addr_list->last_waiter = job_fiber_id;
|
||||||
|
++wait_addr_list->num_waiters;
|
||||||
|
}
|
||||||
|
if (wait_time != 0) {
|
||||||
|
/* Search for wait time list in bin */
|
||||||
|
struct wait_list *wait_time_list = NULL;
|
||||||
|
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
|
||||||
|
if (tmp->value == (u64)wait_time) {
|
||||||
|
wait_time_list = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* Allocate new wait time list */
|
||||||
|
if (!wait_time_list) {
|
||||||
|
if (wait_time_bin->first_free_wait_list) {
|
||||||
|
wait_time_list = wait_time_bin->first_free_wait_list;
|
||||||
|
wait_time_bin->first_free_wait_list = wait_time_list->next_in_bin;
|
||||||
|
} else {
|
||||||
|
while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
wait_time_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list);
|
||||||
|
}
|
||||||
|
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
|
||||||
|
}
|
||||||
|
MEMZERO_STRUCT(wait_time_list);
|
||||||
|
wait_time_list->value = wait_time;
|
||||||
|
if (wait_time_bin->last_wait_list) {
|
||||||
|
wait_time_bin->last_wait_list->next_in_bin = wait_time_list;
|
||||||
|
wait_time_list->prev_in_bin = wait_time_bin->last_wait_list;
|
||||||
|
} else {
|
||||||
|
wait_time_bin->first_wait_list = wait_time_list;
|
||||||
|
}
|
||||||
|
wait_time_bin->last_wait_list = wait_time_list;
|
||||||
|
}
|
||||||
|
/* Insert fiber into wait time list */
|
||||||
|
job_fiber->wait_time = wait_time;
|
||||||
|
if (wait_time_list->last_waiter) {
|
||||||
|
fiber_from_id(wait_time_list->last_waiter)->next_time_waiter = job_fiber_id;
|
||||||
|
job_fiber->prev_time_waiter = wait_time_list->last_waiter;
|
||||||
|
} else {
|
||||||
|
wait_time_list->first_waiter = job_fiber_id;
|
||||||
|
}
|
||||||
|
wait_time_list->last_waiter = job_fiber_id;
|
||||||
|
++wait_time_list->num_waiters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Pop worker's job fiber */
|
||||||
|
job_fiber = NULL;
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (wait_time != 0) atomic_i32_fetch_set(&wait_time_bin->lock, 0);
|
||||||
|
}
|
||||||
|
if (wait_addr != 0) atomic_i32_fetch_set(&wait_addr_bin->lock, 0);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
{
|
{
|
||||||
/* Load and compare values now that bin is locked */
|
/* Load and compare values now that bin is locked */
|
||||||
b32 cancel_wait;
|
b32 cancel_wait;
|
||||||
@ -889,53 +1117,54 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
|
|||||||
}
|
}
|
||||||
if (!cancel_wait) {
|
if (!cancel_wait) {
|
||||||
/* Search addr wait list in bin */
|
/* Search addr wait list in bin */
|
||||||
struct wait_list *wait_list = NULL;
|
struct wait_list *wait_addr_list = NULL;
|
||||||
for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) {
|
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
|
||||||
if (tmp->value == (u64)wait_addr) {
|
if (tmp->value == (u64)wait_addr) {
|
||||||
wait_list = tmp;
|
wait_addr_list = tmp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Allocate new wait list */
|
/* Allocate new wait list */
|
||||||
if (!wait_list) {
|
if (!wait_addr_list) {
|
||||||
if (bin->first_free_wait_list) {
|
if (wait_addr_bin->first_free_wait_list) {
|
||||||
wait_list = bin->first_free_wait_list;
|
wait_addr_list = wait_addr_bin->first_free_wait_list;
|
||||||
bin->first_free_wait_list = wait_list->next_in_bin;
|
wait_addr_bin->first_free_wait_list = wait_addr_list->next_in_bin;
|
||||||
} else {
|
} else {
|
||||||
while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause();
|
while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause();
|
||||||
{
|
{
|
||||||
wait_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list);
|
wait_addr_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list);
|
||||||
}
|
}
|
||||||
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
|
atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0);
|
||||||
}
|
}
|
||||||
MEMZERO_STRUCT(wait_list);
|
MEMZERO_STRUCT(wait_addr_list);
|
||||||
wait_list->value = wait_addr;
|
wait_addr_list->value = wait_addr;
|
||||||
if (bin->last_wait_list) {
|
if (wait_addr_bin->last_wait_list) {
|
||||||
bin->last_wait_list->next_in_bin = wait_list;
|
wait_addr_bin->last_wait_list->next_in_bin = wait_addr_list;
|
||||||
wait_list->prev_in_bin = bin->last_wait_list;
|
wait_addr_list->prev_in_bin = wait_addr_bin->last_wait_list;
|
||||||
} else {
|
} else {
|
||||||
bin->first_wait_list = wait_list;
|
wait_addr_bin->first_wait_list = wait_addr_list;
|
||||||
}
|
}
|
||||||
bin->last_wait_list = wait_list;
|
wait_addr_bin->last_wait_list = wait_addr_list;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Insert fiber into wait list */
|
/* Insert fiber into wait list */
|
||||||
job_fiber->wait_addr = (u64)wait_addr;
|
job_fiber->wait_addr = wait_addr;
|
||||||
if (wait_list->last_waiter) {
|
if (wait_addr_list->last_waiter) {
|
||||||
fiber_from_id(wait_list->last_waiter)->next_addr_waiter = job_fiber_id;
|
fiber_from_id(wait_addr_list->last_waiter)->next_addr_waiter = job_fiber_id;
|
||||||
job_fiber->prev_addr_waiter = wait_list->last_waiter;
|
job_fiber->prev_addr_waiter = wait_addr_list->last_waiter;
|
||||||
} else {
|
} else {
|
||||||
wait_list->first_waiter = job_fiber_id;
|
wait_addr_list->first_waiter = job_fiber_id;
|
||||||
}
|
}
|
||||||
wait_list->last_waiter = job_fiber_id;
|
wait_addr_list->last_waiter = job_fiber_id;
|
||||||
++wait_list->num_waiters;
|
++wait_addr_list->num_waiters;
|
||||||
|
|
||||||
/* Pop worker's job fiber */
|
/* Pop worker's job fiber */
|
||||||
job_fiber = NULL;
|
job_fiber = NULL;
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
atomic_i32_fetch_set(&bin->lock, 0);
|
atomic_i32_fetch_set(&wait_addr_bin->lock, 0);
|
||||||
|
#endif
|
||||||
} break;
|
} break;
|
||||||
|
|
||||||
case YIELD_KIND_DONE:
|
case YIELD_KIND_DONE:
|
||||||
@ -950,7 +1179,9 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wake_lock = snc_lock_s(&G.workers_wake_mutex);
|
/* Wait */
|
||||||
|
struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex);
|
||||||
|
{
|
||||||
if (queues_empty) {
|
if (queues_empty) {
|
||||||
i64 new_wake_gen = atomic_i64_fetch(&G.workers_wake_gen);
|
i64 new_wake_gen = atomic_i64_fetch(&G.workers_wake_gen);
|
||||||
while (new_wake_gen == last_seen_wake_gen) {
|
while (new_wake_gen == last_seen_wake_gen) {
|
||||||
@ -964,6 +1195,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
snc_unlock(&wake_lock);
|
snc_unlock(&wake_lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ========================== *
|
/* ========================== *
|
||||||
@ -972,18 +1204,222 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg)
|
|||||||
|
|
||||||
INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
|
INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _)
|
||||||
{
|
{
|
||||||
|
(UNUSED)_;
|
||||||
|
{
|
||||||
|
i32 priority = THREAD_PRIORITY_TIME_CRITICAL;
|
||||||
|
b32 success = SetThreadPriority(GetCurrentThread(), priority);
|
||||||
|
(UNUSED)success;
|
||||||
|
ASSERT(success);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct arena_temp scratch = scratch_begin_no_conflict();
|
||||||
|
|
||||||
HANDLE timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
|
HANDLE timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
|
||||||
if (!timer) {
|
if (!timer) {
|
||||||
sys_panic(LIT("Failed to create high resolution timer"));
|
sys_panic(LIT("Failed to create high resolution timer"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
i64 last_interval = sys_time_ns() / SCHEDULER_MIN_INTERVAL_NS;
|
||||||
while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
|
while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
|
||||||
__profn("Scheduler");
|
{
|
||||||
|
__profn("Job scheduler wait");
|
||||||
LARGE_INTEGER due = ZI;
|
LARGE_INTEGER due = ZI;
|
||||||
due.QuadPart = -1000;
|
due.QuadPart = -(SCHEDULER_MIN_INTERVAL_NS / 100);
|
||||||
SetWaitableTimerEx(timer, &due, 0, NULL, NULL, NULL, 0);
|
SetWaitableTimerEx(timer, &due, 0, NULL, NULL, NULL, 0);
|
||||||
//SetWaitableTimerEx(timer, &due, 5000, NULL, NULL, NULL, 0);
|
|
||||||
WaitForSingleObject(timer, INFINITE);
|
WaitForSingleObject(timer, INFINITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1);
|
||||||
|
i64 new_interval = sys_time_ns() / SCHEDULER_MIN_INTERVAL_NS;
|
||||||
|
atomic_i64_fetch_set(&G.current_scheduler_interval, new_interval);
|
||||||
|
{
|
||||||
|
__profn("Job scheduler run");
|
||||||
|
struct arena_temp temp = arena_temp_begin(scratch.arena);
|
||||||
|
for (i64 interval = last_interval; interval < new_interval; ++interval) {
|
||||||
|
u64 wait_time_bin_index = (u64)interval % NUM_WAIT_TIME_BINS;
|
||||||
|
struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index];
|
||||||
|
struct wait_list *wait_time_list = NULL;
|
||||||
|
|
||||||
|
/* Build list of waiters to resume */
|
||||||
|
i32 num_waiters = 0;
|
||||||
|
struct fiber **waiters = NULL;
|
||||||
|
{
|
||||||
|
while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
/* Search for wait time list */
|
||||||
|
for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) {
|
||||||
|
if (tmp->value == (u64)interval) {
|
||||||
|
wait_time_list = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (wait_time_list) {
|
||||||
|
/* Set waiter wake status & build waiters list */
|
||||||
|
waiters = arena_push_array_no_zero(temp.arena, struct fiber *, wait_time_list->num_waiters);
|
||||||
|
for (struct fiber *waiter = fiber_from_id(wait_time_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_time_waiter)) {
|
||||||
|
if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) {
|
||||||
|
waiters[num_waiters] = waiter;
|
||||||
|
++num_waiters;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic_i32_fetch_set(&wait_time_bin->lock, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Update wait lists */
|
||||||
|
for (i32 i = 0; i < num_waiters; ++i) {
|
||||||
|
struct fiber *waiter = waiters[i];
|
||||||
|
u64 wait_addr = waiter->wait_addr;
|
||||||
|
u64 wait_addr_bin_index = wait_addr % NUM_WAIT_ADDR_BINS;
|
||||||
|
struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index];
|
||||||
|
|
||||||
|
if (wait_addr != 0) while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
/* Search for wait addr list */
|
||||||
|
struct wait_list *wait_addr_list = NULL;
|
||||||
|
if (wait_addr != 0) {
|
||||||
|
for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) {
|
||||||
|
if (tmp->value == (u64)wait_addr) {
|
||||||
|
wait_addr_list = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
/* Remove from addr list */
|
||||||
|
if (wait_addr_list) {
|
||||||
|
if (--wait_addr_list->num_waiters == 0) {
|
||||||
|
/* Free addr list */
|
||||||
|
struct wait_list *prev = wait_addr_list->prev_in_bin;
|
||||||
|
struct wait_list *next = wait_addr_list->next_in_bin;
|
||||||
|
if (prev) {
|
||||||
|
prev->next_in_bin = next;
|
||||||
|
} else {
|
||||||
|
wait_addr_bin->first_wait_list = next;
|
||||||
|
}
|
||||||
|
if (next) {
|
||||||
|
next->next_in_bin = prev;
|
||||||
|
} else {
|
||||||
|
wait_addr_bin->last_wait_list = prev;
|
||||||
|
}
|
||||||
|
wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list;
|
||||||
|
wait_addr_bin->first_free_wait_list = wait_addr_list;
|
||||||
|
} else {
|
||||||
|
i16 prev_id = waiter->prev_addr_waiter;
|
||||||
|
i16 next_id = waiter->next_addr_waiter;
|
||||||
|
if (prev_id) {
|
||||||
|
fiber_from_id(prev_id)->next_addr_waiter = next_id;
|
||||||
|
} else {
|
||||||
|
wait_addr_list->first_waiter = next_id;
|
||||||
|
}
|
||||||
|
if (next_id) {
|
||||||
|
fiber_from_id(next_id)->prev_addr_waiter = prev_id;
|
||||||
|
} else {
|
||||||
|
wait_addr_list->last_waiter = prev_id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waiter->wait_addr = 0;
|
||||||
|
waiter->prev_addr_waiter = 0;
|
||||||
|
waiter->next_addr_waiter = 0;
|
||||||
|
}
|
||||||
|
/* Remove from time list */
|
||||||
|
{
|
||||||
|
if (--wait_time_list->num_waiters == 0) {
|
||||||
|
/* Free time list */
|
||||||
|
struct wait_list *prev = wait_time_list->prev_in_bin;
|
||||||
|
struct wait_list *next = wait_time_list->next_in_bin;
|
||||||
|
if (prev) {
|
||||||
|
prev->next_in_bin = next;
|
||||||
|
} else {
|
||||||
|
wait_time_bin->first_wait_list = next;
|
||||||
|
}
|
||||||
|
if (next) {
|
||||||
|
next->next_in_bin = prev;
|
||||||
|
} else {
|
||||||
|
wait_time_bin->last_wait_list = prev;
|
||||||
|
}
|
||||||
|
wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list;
|
||||||
|
wait_time_bin->first_free_wait_list = wait_time_list;
|
||||||
|
} else {
|
||||||
|
i16 prev_id = waiter->prev_time_waiter;
|
||||||
|
i16 next_id = waiter->next_time_waiter;
|
||||||
|
if (prev_id) {
|
||||||
|
fiber_from_id(prev_id)->next_time_waiter = next_id;
|
||||||
|
} else {
|
||||||
|
wait_time_list->first_waiter = next_id;
|
||||||
|
}
|
||||||
|
if (next_id) {
|
||||||
|
fiber_from_id(next_id)->prev_time_waiter = prev_id;
|
||||||
|
} else {
|
||||||
|
wait_time_list->last_waiter = prev_id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waiter->wait_time = 0;
|
||||||
|
waiter->prev_time_waiter = 0;
|
||||||
|
waiter->next_time_waiter = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic_i32_fetch_set(&wait_time_bin->lock, 0);
|
||||||
|
}
|
||||||
|
if (wait_addr != 0) atomic_i32_fetch_set(&wait_addr_bin->lock, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Resume waiters */
|
||||||
|
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
|
||||||
|
for (i32 i = 0; i < num_waiters; ++i) {
|
||||||
|
struct fiber *waiter = waiters[i];
|
||||||
|
enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority);
|
||||||
|
struct job_queue *queue = &G.job_queues[queue_kind];
|
||||||
|
while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause();
|
||||||
|
{
|
||||||
|
struct job_info *info = NULL;
|
||||||
|
if (queue->first_free) {
|
||||||
|
info = queue->first_free;
|
||||||
|
queue->first_free = info->next;
|
||||||
|
} else {
|
||||||
|
info = arena_push_no_zero(queue->arena, struct job_info);
|
||||||
|
}
|
||||||
|
MEMZERO_STRUCT(info);
|
||||||
|
info->count = 1;
|
||||||
|
info->num_dispatched = waiter->job_id;
|
||||||
|
info->func = waiter->job_func;
|
||||||
|
info->sig = waiter->job_sig;
|
||||||
|
info->counter = waiter->job_counter;
|
||||||
|
info->fiber_id = waiter->id;
|
||||||
|
if (queue->last) {
|
||||||
|
queue->last->next = info;
|
||||||
|
} else {
|
||||||
|
queue->first = info;
|
||||||
|
}
|
||||||
|
queue->last = info;
|
||||||
|
atomic_u64_fetch_set(&waiter->wake_gen, 0);
|
||||||
|
}
|
||||||
|
atomic_i32_fetch_set(&queue->lock, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Wake workers */
|
||||||
|
/* TODO: Only wake necessary amount of workers */
|
||||||
|
if (num_waiters > 0) {
|
||||||
|
struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex);
|
||||||
|
{
|
||||||
|
if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) {
|
||||||
|
atomic_i64_fetch_add(&G.workers_wake_gen, 1);
|
||||||
|
snc_cv_broadcast(&G.workers_wake_cv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
snc_unlock(&lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
arena_temp_end(temp);
|
||||||
|
}
|
||||||
|
last_interval = new_interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
scratch_end(scratch);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ========================== *
|
/* ========================== *
|
||||||
@ -995,6 +1431,10 @@ INTERNAL SYS_THREAD_DEF(test_entry, _)
|
|||||||
struct arena_temp scratch = scratch_begin_no_conflict();
|
struct arena_temp scratch = scratch_begin_no_conflict();
|
||||||
(UNUSED)_;
|
(UNUSED)_;
|
||||||
|
|
||||||
|
/* Start scheduler */
|
||||||
|
struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, NULL, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER);
|
||||||
|
while (atomic_i64_fetch(&G.current_scheduler_interval) == 0) ix_pause();
|
||||||
|
|
||||||
/* Start workers */
|
/* Start workers */
|
||||||
G.num_worker_threads = 6;
|
G.num_worker_threads = 6;
|
||||||
G.worker_threads_arena = arena_alloc(GIBI(64));
|
G.worker_threads_arena = arena_alloc(GIBI(64));
|
||||||
@ -1007,9 +1447,6 @@ INTERNAL SYS_THREAD_DEF(test_entry, _)
|
|||||||
G.worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i);
|
G.worker_threads[i] = sys_thread_alloc(job_worker_entry, ctx, name, PROF_THREAD_GROUP_WORKERS + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Start scheduler */
|
|
||||||
struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, NULL, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER);
|
|
||||||
|
|
||||||
/* Wait on workers */
|
/* Wait on workers */
|
||||||
for (i32 i = 0; i < G.num_worker_threads; ++i) {
|
for (i32 i = 0; i < G.num_worker_threads; ++i) {
|
||||||
struct sys_thread *worker_thread = G.worker_threads[i];
|
struct sys_thread *worker_thread = G.worker_threads[i];
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user