diff --git a/src/common.h b/src/common.h index 7744ae25..bc31e74c 100644 --- a/src/common.h +++ b/src/common.h @@ -417,6 +417,40 @@ struct atomic_ptr { volatile void *_v; }; +/* ========================== * + * Cache-line isolated atomics + * ========================== */ + +struct alignas(64) atomic_i32_padded { + struct atomic_i32 v; + u8 _pad[60]; +}; +STATIC_ASSERT(sizeof(struct atomic_i32_padded) == 64 && alignof(struct atomic_i32_padded) == 64); + +struct alignas(64) atomic_i64_padded { + struct atomic_i64 v; + u8 _pad[56]; +}; +STATIC_ASSERT(sizeof(struct atomic_i64_padded) == 64 && alignof(struct atomic_i64_padded) == 64); + +struct alignas(64) atomic_u32_padded { + struct atomic_u32 v; + u8 _pad[60]; +}; +STATIC_ASSERT(sizeof(struct atomic_u32_padded) == 64 && alignof(struct atomic_u32_padded) == 64); + +struct alignas(64) atomic_u64_padded { + struct atomic_u64 v; + u8 _pad[56]; +}; +STATIC_ASSERT(sizeof(struct atomic_u64_padded) == 64 && alignof(struct atomic_u64_padded) == 64); + +struct alignas(64) atomic_ptr_padded { + struct atomic_ptr v; + u8 _pad[56]; +}; +STATIC_ASSERT(sizeof(struct atomic_ptr_padded) == 64 && alignof(struct atomic_ptr_padded) == 64); + /* ========================== * * Common structs * ========================== */ diff --git a/src/gstat.h b/src/gstat.h index 53e1b352..d46c6dd8 100644 --- a/src/gstat.h +++ b/src/gstat.h @@ -8,25 +8,25 @@ #include "atomic.h" struct _gstats { - struct atomic_u64 GSTAT_SOCK_BYTES_SENT; - struct atomic_u64 GSTAT_SOCK_BYTES_RECEIVED; - struct atomic_u64 GSTAT_MEMORY_COMMITTED; - struct atomic_u64 GSTAT_MEMORY_RESERVED; - struct atomic_u64 GSTAT_NUM_ARENAS; + struct atomic_u64_padded GSTAT_SOCK_BYTES_SENT; + struct atomic_u64_padded GSTAT_SOCK_BYTES_RECEIVED; + struct atomic_u64_padded GSTAT_MEMORY_COMMITTED; + struct atomic_u64_padded GSTAT_MEMORY_RESERVED; + struct atomic_u64_padded GSTAT_NUM_ARENAS; }; extern struct _gstats _g_gstats; -#define gstat_set(name, v) atomic_u64_fetch_set(&_g_gstats.name, (v)) -#define gstat_add(name, v) atomic_u64_fetch_add_u64(&_g_gstats.name, (v)) -#define gstat_sub(name, v) atomic_u64_fetch_add_i64(&_g_gstats.name, -((i64)(v))) -#define gstat_get(name) atomic_u64_fetch(&_g_gstats.name) +#define gstat_set(name, value) atomic_u64_fetch_set(&_g_gstats.name.v, (value)) +#define gstat_add(name, value) atomic_u64_fetch_add_u64(&_g_gstats.name.v, (value)) +#define gstat_sub(name, value) atomic_u64_fetch_add_i64(&_g_gstats.name.v, -((i64)(value))) +#define gstat_get(name) atomic_u64_fetch(&_g_gstats.name.v) #else -#define gstat_set(name, v) -#define gstat_add(name, v) -#define gstat_sub(name, v) +#define gstat_set(name, value) +#define gstat_add(name, value) +#define gstat_sub(name, value) #define gstat_get(name) 0 #endif diff --git a/src/host.c b/src/host.c index fc860c88..dc7779c5 100644 --- a/src/host.c +++ b/src/host.c @@ -205,7 +205,7 @@ struct host *host_alloc(u16 listen_port) void host_release(struct host *host) { - atomic_i32_fetch_set(&host->receiver_thread_shutdown_flag, 1); + atomic_i32_fetch_set(&host->receiver_thread_shutdown_flag.v, 1); sock_wake(host->sock); while (!sys_thread_try_release(host->receiver_thread, 0.001f)) { sock_wake(host->sock); @@ -1072,7 +1072,7 @@ INTERNAL SYS_THREAD_DEF(host_receiver_thread_entry_point, arg) socks.socks = &host->sock; socks.count = 1; - struct atomic_i32 *shutdown = &host->receiver_thread_shutdown_flag; + struct atomic_i32 *shutdown = &host->receiver_thread_shutdown_flag.v; while (!atomic_i32_fetch(shutdown)) { struct sock *sock = sock_wait_for_available_read(socks, F32_INFINITY); struct sock_read_result res; diff --git a/src/host.h b/src/host.h index 0d7d2afe..fac7de24 100644 --- a/src/host.h +++ b/src/host.h @@ -98,7 +98,7 @@ struct host { u64 bytes_received; u64 bytes_sent; - struct atomic_i32 receiver_thread_shutdown_flag; + struct atomic_i32_padded receiver_thread_shutdown_flag; struct sys_thread *receiver_thread; }; diff --git a/src/snc.h b/src/snc.h index 3a673a99..948d3e2f 100644 --- a/src/snc.h +++ b/src/snc.h @@ -56,10 +56,6 @@ void snc_cv_wait(struct snc_cv *cv, struct snc_lock *lock); void snc_cv_wait_time(struct snc_cv *cv, struct snc_lock *l, i64 timeout_ns); void snc_cv_broadcast(struct snc_cv *cv); -/* ========================== * - * Fence - * ========================== */ - /* ========================== * * Counter * ========================== */ @@ -71,7 +67,6 @@ struct alignas(64) snc_counter { STATIC_ASSERT(sizeof(struct snc_counter) == 64); /* Padding validation */ STATIC_ASSERT(alignof(struct snc_counter) == 64); /* Prevent false sharing */ - void snc_counter_add(struct snc_counter *counter, i64 x); void snc_counter_wait(struct snc_counter *counter); diff --git a/src/sprite.c b/src/sprite.c index 98e4a213..ff335fed 100644 --- a/src/sprite.c +++ b/src/sprite.c @@ -67,7 +67,7 @@ struct cache_entry { enum cache_entry_kind kind; struct cache_entry_hash hash; struct atomic_i32 state; - struct atomic_u64 refcount_struct; /* Cast fetched result to `cache_refcount` */ + struct atomic_u64_padded refcount_struct; /* Cast fetched result to `cache_refcount` */ /* Allocated data */ /* NOTE: This data is finalized once entry state = loaded */ @@ -96,7 +96,7 @@ struct cache_bin { }; struct cache { - struct atomic_u64 memory_usage; + struct atomic_u64_padded memory_usage; struct arena *arena; struct cache_bin *bins; struct snc_mutex entry_pool_mutex; @@ -146,12 +146,12 @@ GLOBAL struct { struct load_cmd *first_free_load_cmd; /* Scopes */ - struct atomic_i32 scopes_lock; + struct snc_mutex scopes_mutex; struct arena *scopes_arena; struct sprite_scope *first_free_scope; /* Evictor */ - struct atomic_i32 evictor_cycle; + struct atomic_i32_padded evictor_cycle; struct snc_counter shutdown_counter; b32 evictor_scheduler_shutdown; struct snc_mutex evictor_scheduler_mutex; @@ -380,7 +380,7 @@ INTERNAL void cache_entry_load_texture(struct cache_ref ref, struct sprite_tag t } arena_set_readonly(e->arena); e->memory_usage = e->arena->committed + memory_size; - atomic_u64_fetch_add_u64(&G.cache.memory_usage, e->memory_usage); + atomic_u64_fetch_add_u64(&G.cache.memory_usage.v, e->memory_usage); if (success) { logf_success("Loaded sprite texture [%F] \"%F\" in %F seconds (cache size: %F bytes).", @@ -701,7 +701,7 @@ INTERNAL void cache_entry_load_sheet(struct cache_ref ref, struct sprite_tag tag } arena_set_readonly(e->arena); e->memory_usage = e->arena->committed; - atomic_u64_fetch_add_u64(&G.cache.memory_usage, e->memory_usage); + atomic_u64_fetch_add_u64(&G.cache.memory_usage.v, e->memory_usage); if (success) { logf_success("Loaded sprite sheet [%F] \"%F\" in %F seconds (cache size: %F bytes).", @@ -736,8 +736,8 @@ INTERNAL void cache_entry_load_sheet(struct cache_ref ref, struct sprite_tag tag INTERNAL void refcount_add(struct cache_entry *e, i32 amount) { - i32 evictor_cycle = atomic_i32_fetch(&G.evictor_cycle); - struct atomic_u64 *refcount_atomic = &e->refcount_struct; + i32 evictor_cycle = atomic_i32_fetch(&G.evictor_cycle.v); + struct atomic_u64 *refcount_atomic = &e->refcount_struct.v; u64 old_refcount_uncast = atomic_u64_fetch(refcount_atomic); do { struct cache_refcount new_refcount = *(struct cache_refcount *)&old_refcount_uncast; @@ -806,7 +806,7 @@ struct sprite_scope *sprite_scope_begin(void) struct sprite_scope_cache_ref **bins = NULL; struct sprite_scope_cache_ref *pool = NULL; { - while (atomic_i32_fetch_test_set(&G.scopes_lock, 0, 1) != 0) ix_pause(); + struct snc_lock lock = snc_lock_e(&G.scopes_mutex); { if (G.first_free_scope) { res = G.first_free_scope; @@ -819,7 +819,7 @@ struct sprite_scope *sprite_scope_begin(void) pool = arena_push_array_no_zero(G.scopes_arena, struct sprite_scope_cache_ref, MAX_SCOPE_REFERENCES); } } - atomic_i32_fetch_set(&G.scopes_lock, 0); + snc_unlock(&lock); } MEMZERO_STRUCT(res); MEMZERO(bins, sizeof(*bins) * CACHE_BINS_COUNT); @@ -838,12 +838,12 @@ void sprite_scope_end(struct sprite_scope *scope) } /* Release scope */ - while (atomic_i32_fetch_test_set(&G.scopes_lock, 0, 1) != 0) ix_pause(); + struct snc_lock lock = snc_lock_e(&G.scopes_mutex); { scope->next_free = G.first_free_scope; G.first_free_scope = scope; } - atomic_i32_fetch_set(&G.scopes_lock, 0); + snc_unlock(&lock); } /* ========================== * @@ -1243,10 +1243,10 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _) u64 evict_array_count = 0; struct evict_node *evict_array = arena_push_dry(scratch.arena, struct evict_node); { - i32 cur_cycle = atomic_i32_fetch(&G.evictor_cycle); + i32 cur_cycle = atomic_i32_fetch(&G.evictor_cycle.v); /* Scan for evictable nodes */ - b32 cache_over_budget_threshold = atomic_u64_fetch(&G.cache.memory_usage) > CACHE_MEMORY_BUDGET_THRESHOLD; + b32 cache_over_budget_threshold = atomic_u64_fetch(&G.cache.memory_usage.v) > CACHE_MEMORY_BUDGET_THRESHOLD; if (cache_over_budget_threshold || RESOURCE_RELOADING) { __profn("Evictor scan"); for (u64 i = 0; i < CACHE_BINS_COUNT; ++i) { @@ -1255,7 +1255,7 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _) { struct cache_entry *n = bin->first; while (n) { - u64 refcount_uncast = atomic_u64_fetch(&n->refcount_struct); + u64 refcount_uncast = atomic_u64_fetch(&n->refcount_struct.v); struct cache_refcount refcount = *(struct cache_refcount *)&refcount_uncast; if (refcount.count <= 0) { /* Add node to evict list */ @@ -1303,10 +1303,10 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _) struct cache_bin *bin = en->cache_bin; struct cache_entry *entry = en->cache_entry; i32 last_ref_cycle = en->last_ref_cycle; - b32 cache_over_budget_target = atomic_u64_fetch(&G.cache.memory_usage) > CACHE_MEMORY_BUDGET_TARGET; + b32 cache_over_budget_target = atomic_u64_fetch(&G.cache.memory_usage.v) > CACHE_MEMORY_BUDGET_TARGET; struct snc_lock bin_lock = snc_lock_e(&bin->mutex); { - u64 refcount_uncast = atomic_u64_fetch(&entry->refcount_struct); + u64 refcount_uncast = atomic_u64_fetch(&entry->refcount_struct.v); struct cache_refcount refcount = *(struct cache_refcount *)&refcount_uncast; if (refcount.count > 0 || (last_ref_cycle >= 0 && refcount.last_ref_cycle != en->last_ref_cycle)) { /* Cache node has been referenced since scan, skip node. */ @@ -1325,7 +1325,7 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _) bin->last = prev; } - atomic_u64_fetch_add_i64(&G.cache.memory_usage, -((i64)entry->memory_usage)); + atomic_u64_fetch_add_i64(&G.cache.memory_usage.v, -((i64)entry->memory_usage)); /* Add to evicted list */ en->next_evicted = first_evicted; @@ -1365,7 +1365,7 @@ INTERNAL SYS_JOB_DEF(sprite_evictor_job, _) } } } - atomic_i32_fetch_add(&G.evictor_cycle, 1); + atomic_i32_fetch_add(&G.evictor_cycle.v, 1); scratch_end(scratch); } diff --git a/src/sys_win32.c b/src/sys_win32.c index eb37d7fe..4729841e 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -34,15 +34,13 @@ #define THREAD_STACK_SIZE KIBI(64) #define FIBER_STACK_SIZE MEBI(4) -struct win32_mutex { - struct atomic_i32 v; /* Bits 0-30 = shared lock count, bit 30 = is exclusive lock pending, bit 31 = is exclusive locked */ - struct win32_mutex *next_free; -}; +/* Assume scheduler cycle is 20hz at start to be conservative */ +#define DEFAULT_SCHEDULER_CYCLE_PERIOD_NS 50000000 +#define NUM_ROLLING_SCHEDULER_PERIODS 1000 -struct win32_condition_variable { - struct atomic_u64 wake_gen; - struct win32_condition_variable *next_free; -}; +#define FIBER_NAME_PREFIX_CSTR "Fiber [" +#define FIBER_NAME_SUFFIX_CSTR "]" +#define FIBER_NAME_MAX_SIZE 64 struct win32_thread { sys_thread_func *entry_point; @@ -138,16 +136,6 @@ struct alignas(64) wait_bin { STATIC_ASSERT(sizeof(struct wait_bin) == 64); /* Padding validation (increase if necessary) */ STATIC_ASSERT(alignof(struct wait_bin) == 64); /* Avoid false sharing */ - -/* Assume scheduler cycle is 20hz at start to be conservative */ -#define DEFAULT_SCHEDULER_CYCLE_PERIOD_NS 50000000 -#define NUM_ROLLING_SCHEDULER_PERIODS 1000 - - -#define FIBER_NAME_PREFIX_CSTR "Fiber [" -#define FIBER_NAME_SUFFIX_CSTR "]" -#define FIBER_NAME_MAX_SIZE 64 - enum yield_kind { YIELD_KIND_NONE, YIELD_KIND_DONE, @@ -243,10 +231,38 @@ enum job_queue_kind { NUM_JOB_QUEUE_KINDS }; +/* ========================== * + * Ticket mutex + * ========================== */ + +struct tm { + struct atomic_i64_padded ticket; + struct atomic_i64_padded serving; +}; + +INTERNAL void tm_lock(struct tm *tm) +{ + i64 ticket = atomic_i64_fetch_add(&tm->ticket.v, 1); + while (atomic_i64_fetch(&tm->serving.v) != ticket) { + ix_pause(); + } +} + +INTERNAL void tm_unlock(struct tm *tm) +{ + atomic_i64_fetch_add(&tm->serving.v, 1); +} + + + + + + + struct alignas(64) job_queue { enum job_queue_kind kind; - struct atomic_i32 lock; + struct tm lock; struct arena *arena; struct job_info *first; @@ -298,12 +314,12 @@ GLOBAL struct { /* Scheduler */ - struct atomic_i64 current_scheduler_cycle; /* TODO: Prevent false sharing */ - struct atomic_i64 current_scheduler_cycle_period_ns; /* TODO: Prevent false sharing */ + struct atomic_i64_padded current_scheduler_cycle; + struct atomic_i64_padded current_scheduler_cycle_period_ns; /* Wait lists */ - struct atomic_u64 waiter_wake_gen; /* TODO: Prevent false sharing */ - struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */ + struct atomic_u64_padded waiter_wake_gen; + struct tm wait_lists_arena_lock; struct arena *wait_lists_arena; /* Wait tables */ @@ -314,15 +330,15 @@ GLOBAL struct { i16 num_fibers; i16 first_free_fiber_id; struct arena *fiber_names_arena; - struct atomic_i32 fibers_lock; /* TODO: Prevent false sharing */ + struct tm fibers_lock; struct fiber fibers[SYS_MAX_FIBERS]; /* Jobs */ struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; /* Workers */ - struct atomic_i32 workers_shutdown; - struct atomic_i64 num_jobs_in_queue; /* TODO: Prevent false sharing */ + struct atomic_i32_padded workers_shutdown; + struct atomic_i64_padded num_jobs_in_queue; struct snc_mutex workers_wake_mutex; struct snc_cv workers_wake_cv; @@ -352,7 +368,7 @@ INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queu i64 sys_current_scheduler_period_ns(void) { - return atomic_i64_fetch(&G.current_scheduler_cycle_period_ns); + return atomic_i64_fetch(&G.current_scheduler_cycle_period_ns.v); } @@ -424,7 +440,7 @@ void sys_wake_all(void *addr) { struct arena_temp scratch = scratch_begin_no_conflict(); - u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1); + u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1); u64 wait_addr_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS; struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index]; @@ -556,7 +572,7 @@ void sys_wake_all(void *addr) struct fiber *waiter = waiters[i]; enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority); struct job_queue *queue = &G.job_queues[queue_kind]; - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); + tm_lock(&queue->lock); { struct job_info *info = NULL; if (queue->first_free) { @@ -580,7 +596,7 @@ void sys_wake_all(void *addr) queue->last = info; atomic_u64_fetch_set(&waiter->wake_gen, 0); } - atomic_i32_fetch_set(&queue->lock, 0); + tm_unlock(&queue->lock); } /* Wake blocking waiters */ @@ -591,7 +607,7 @@ void sys_wake_all(void *addr) if (num_waiters > 0) { struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); { - atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters); + atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters); snc_cv_broadcast(&G.workers_wake_cv); } snc_unlock(&lock); @@ -617,7 +633,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind) struct fiber *fiber = NULL; char *new_name_cstr = NULL; { - while (atomic_i32_fetch_test_set(&G.fibers_lock, 0, 1) != 0) ix_pause(); + tm_lock(&G.fibers_lock); { fiber_id = G.first_free_fiber_id; if (fiber_id && kind == FIBER_KIND_JOB_WORKER) { @@ -632,7 +648,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind) new_name_cstr = arena_push_array(G.fiber_names_arena, char, FIBER_NAME_MAX_SIZE); } } - atomic_i32_fetch_set(&G.fibers_lock, 0); + tm_unlock(&G.fibers_lock); } if (new_name_cstr != NULL) { fiber->id = fiber_id; @@ -694,12 +710,12 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind) INTERNAL void fiber_release(struct fiber *fiber, i16 fiber_id) { - while (atomic_i32_fetch_test_set(&G.fibers_lock, 0, 1) != 0) ix_pause(); + tm_lock(&G.fibers_lock); { fiber->parent_id = G.first_free_fiber_id; G.first_free_fiber_id = fiber_id; } - atomic_i32_fetch_set(&G.fibers_lock, 0); + tm_unlock(&G.fibers_lock); } FORCE_INLINE struct fiber *fiber_from_id(i16 id) @@ -730,7 +746,7 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */ enum job_queue_kind queue_kind = job_queue_kind_from_priority(priority); struct job_queue *queue = &G.job_queues[queue_kind]; - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); + tm_lock(&queue->lock); { struct job_info *info = NULL; if (queue->first_free) { @@ -751,13 +767,13 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit } queue->last = info; } - atomic_i32_fetch_set(&queue->lock, 0); + tm_unlock(&queue->lock); } /* Wake workers */ /* TODO: Only wake necessary amount of workers */ struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); { - atomic_i64_fetch_add(&G.num_jobs_in_queue, count); + atomic_i64_fetch_add(&G.num_jobs_in_queue.v, count); snc_cv_broadcast(&G.workers_wake_cv); } snc_unlock(&lock); @@ -881,7 +897,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) for (u32 queue_index = 0; queue_index < countof(queues) && !job_func; ++queue_index) { struct job_queue *queue = queues[queue_index]; if (queue) { - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); + tm_lock(&queue->lock); { struct job_info *info = queue->first; job_priority = (enum sys_priority)queue->kind; @@ -893,7 +909,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) job_id = info->num_dispatched++; if (job_id < info->count) { /* Pick job */ - atomic_i64_fetch_add(&G.num_jobs_in_queue, -1); + atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1); job_func = info->func; job_sig = info->sig; job_counter = info->counter; @@ -904,7 +920,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) } } else { /* This job is to be resumed from a yield */ - atomic_i64_fetch_add(&G.num_jobs_in_queue, -1); + atomic_i64_fetch_add(&G.num_jobs_in_queue.v, -1); job_fiber_id = info->fiber_id; job_id = info->num_dispatched; job_func = info->func; @@ -923,7 +939,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) info = next; } } - atomic_i32_fetch_set(&queue->lock, 0); + tm_unlock(&queue->lock); } } } @@ -973,8 +989,8 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) i64 wait_timeout_ns = yield.wait.timeout_ns; i64 wait_time = 0; if (wait_timeout_ns > 0 && wait_timeout_ns < I64_MAX) { - u64 current_scheduler_cycle = atomic_i64_fetch(&G.current_scheduler_cycle); - i64 current_scheduler_cycle_period_ns = atomic_i64_fetch(&G.current_scheduler_cycle_period_ns); + u64 current_scheduler_cycle = atomic_i64_fetch(&G.current_scheduler_cycle.v); + i64 current_scheduler_cycle_period_ns = atomic_i64_fetch(&G.current_scheduler_cycle_period_ns.v); wait_time = current_scheduler_cycle + max_i64((i64)((f64)wait_timeout_ns / (f64)current_scheduler_cycle_period_ns), 1); } @@ -998,7 +1014,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) } } if (wait_time != 0 && !cancel_wait) { - cancel_wait = wait_time <= atomic_i64_fetch(&G.current_scheduler_cycle); + cancel_wait = wait_time <= atomic_i64_fetch(&G.current_scheduler_cycle.v); } if (!cancel_wait) { if (wait_addr != 0) { @@ -1015,11 +1031,11 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) wait_addr_list = wait_addr_bin->first_free_wait_list; wait_addr_bin->first_free_wait_list = wait_addr_list->next_in_bin; } else { - while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause(); + tm_lock(&G.wait_lists_arena_lock); { wait_addr_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list); } - atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0); + tm_unlock(&G.wait_lists_arena_lock); } MEMZERO_STRUCT(wait_addr_list); wait_addr_list->value = wait_addr; @@ -1056,11 +1072,11 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) wait_time_list = wait_time_bin->first_free_wait_list; wait_time_bin->first_free_wait_list = wait_time_list->next_in_bin; } else { - while (atomic_i32_fetch_test_set(&G.wait_lists_arena_lock, 0, 1) != 0) ix_pause(); + tm_lock(&G.wait_lists_arena_lock); { wait_time_list = arena_push_no_zero(G.wait_lists_arena, struct wait_list); } - atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0); + tm_unlock(&G.wait_lists_arena_lock); } MEMZERO_STRUCT(wait_time_list); wait_time_list->value = wait_time; @@ -1109,11 +1125,11 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) /* Wait */ struct snc_lock wake_lock = snc_lock_s(&G.workers_wake_mutex); { - shutdown = atomic_i32_fetch(&G.workers_shutdown); - while (atomic_i64_fetch(&G.num_jobs_in_queue) <= 0 && !shutdown) { + shutdown = atomic_i32_fetch(&G.workers_shutdown.v); + while (atomic_i64_fetch(&G.num_jobs_in_queue.v) <= 0 && !shutdown) { __profnc("Wait for job", RGB32_F(0.75, 0.75, 0)); snc_cv_wait(&G.workers_wake_cv, &wake_lock); - shutdown = atomic_i32_fetch(&G.workers_shutdown); + shutdown = atomic_i32_fetch(&G.workers_shutdown.v); } } snc_unlock(&wake_lock); @@ -1126,492 +1142,6 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) { -#if 0 - timeBeginPeriod(1); - - (UNUSED)_; - { - i32 priority = THREAD_PRIORITY_TIME_CRITICAL; - b32 success = SetThreadPriority(GetCurrentThread(), priority); - (UNUSED)success; - ASSERT(success); - } - - struct arena_temp scratch = scratch_begin_no_conflict(); - - /* Create ring buffer of scheduler cycles initialized to default value */ - i32 periods_index = 0; - i64 periods[NUM_ROLLING_SCHEDULER_PERIODS] = ZI; - for (i32 i = 0; i < (i32)countof(periods); ++i) { - periods[i] = DEFAULT_SCHEDULER_CYCLE_PERIOD_NS; - } - - i64 last_cycle_ns = 0; - while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { - __profn("Job scheduler cycle"); - { - __profn("Job scheduler sleep"); - Sleep(1); - } - - i64 now_ns = sys_time_ns(); - if (last_cycle_ns != 0) { - i64 new_period_ns = now_ns - last_cycle_ns; - periods[periods_index++] = new_period_ns; - if (periods_index == countof(periods)) { - periods_index = 0; - } - /* Calculate mean period */ - f64 periods_sum_ns = 0; - for (i32 i = 0; i < (i32)countof(periods); ++i) { - periods_sum_ns += (f64)periods[i]; - } - f64 mean_ns = periods_sum_ns / (f64)countof(periods); - atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, math_round_to_int64(mean_ns)); - } - last_cycle_ns = now_ns; - - u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1); - i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle, 1) + 1; - { - __profn("Job scheduler run"); - struct arena_temp temp = arena_temp_begin(scratch.arena); - u64 wait_time_bin_index = (u64)current_cycle % NUM_WAIT_TIME_BINS; - struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index]; - struct wait_list *wait_time_list = NULL; - - /* Build list of waiters to resume */ - i32 num_waiters = 0; - struct fiber **waiters = NULL; - { - while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause(); - { - /* Search for wait time list */ - for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)current_cycle) { - wait_time_list = tmp; - } - } - - if (wait_time_list) { - /* Set waiter wake status & build waiters list */ - waiters = arena_push_array_no_zero(temp.arena, struct fiber *, wait_time_list->num_waiters); - for (struct fiber *waiter = fiber_from_id(wait_time_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_time_waiter)) { - if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) { - waiters[num_waiters] = waiter; - ++num_waiters; - } - - } - } - } - atomic_i32_fetch_set(&wait_time_bin->lock, 0); - } - - /* Update wait lists */ - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - u64 wait_addr = waiter->wait_addr; - u64 wait_addr_bin_index = wait_addr % NUM_WAIT_ADDR_BINS; - struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index]; - - if (wait_addr != 0) while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause(); - { - /* Search for wait addr list */ - struct wait_list *wait_addr_list = NULL; - if (wait_addr != 0) { - for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)wait_addr) { - wait_addr_list = tmp; - } - } - } - - while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause(); - { - /* Remove from addr list */ - if (wait_addr_list) { - if (--wait_addr_list->num_waiters == 0) { - /* Free addr list */ - struct wait_list *prev = wait_addr_list->prev_in_bin; - struct wait_list *next = wait_addr_list->next_in_bin; - if (prev) { - prev->next_in_bin = next; - } else { - wait_addr_bin->first_wait_list = next; - } - if (next) { - next->next_in_bin = prev; - } else { - wait_addr_bin->last_wait_list = prev; - } - wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list; - wait_addr_bin->first_free_wait_list = wait_addr_list; - } else { - i16 prev_id = waiter->prev_addr_waiter; - i16 next_id = waiter->next_addr_waiter; - if (prev_id) { - fiber_from_id(prev_id)->next_addr_waiter = next_id; - } else { - wait_addr_list->first_waiter = next_id; - } - if (next_id) { - fiber_from_id(next_id)->prev_addr_waiter = prev_id; - } else { - wait_addr_list->last_waiter = prev_id; - } - } - waiter->wait_addr = 0; - waiter->prev_addr_waiter = 0; - waiter->next_addr_waiter = 0; - } - /* Remove from time list */ - { - if (--wait_time_list->num_waiters == 0) { - /* Free time list */ - struct wait_list *prev = wait_time_list->prev_in_bin; - struct wait_list *next = wait_time_list->next_in_bin; - if (prev) { - prev->next_in_bin = next; - } else { - wait_time_bin->first_wait_list = next; - } - if (next) { - next->next_in_bin = prev; - } else { - wait_time_bin->last_wait_list = prev; - } - wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list; - wait_time_bin->first_free_wait_list = wait_time_list; - } else { - i16 prev_id = waiter->prev_time_waiter; - i16 next_id = waiter->next_time_waiter; - if (prev_id) { - fiber_from_id(prev_id)->next_time_waiter = next_id; - } else { - wait_time_list->first_waiter = next_id; - } - if (next_id) { - fiber_from_id(next_id)->prev_time_waiter = prev_id; - } else { - wait_time_list->last_waiter = prev_id; - } - } - waiter->wait_time = 0; - waiter->prev_time_waiter = 0; - waiter->next_time_waiter = 0; - } - } - atomic_i32_fetch_set(&wait_time_bin->lock, 0); - } - if (wait_addr != 0) atomic_i32_fetch_set(&wait_addr_bin->lock, 0); - } - - /* Resume waiters */ - /* TODO: Batch submit waiters based on queue kind rather than one at a time */ - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority); - struct job_queue *queue = &G.job_queues[queue_kind]; - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); - { - struct job_info *info = NULL; - if (queue->first_free) { - info = queue->first_free; - queue->first_free = info->next; - } else { - info = arena_push_no_zero(queue->arena, struct job_info); - } - MEMZERO_STRUCT(info); - info->count = 1; - info->num_dispatched = waiter->job_id; - info->func = waiter->job_func; - info->sig = waiter->job_sig; - info->counter = waiter->job_counter; - info->fiber_id = waiter->id; - if (queue->last) { - queue->last->next = info; - } else { - queue->first = info; - } - queue->last = info; - atomic_u64_fetch_set(&waiter->wake_gen, 0); - } - atomic_i32_fetch_set(&queue->lock, 0); - } - - /* Wake workers */ - /* TODO: Only wake necessary amount of workers */ - if (num_waiters > 0) { - struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); - { - atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters); - if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { - atomic_i64_fetch_add(&G.workers_wake_gen, 1); - snc_cv_broadcast(&G.workers_wake_cv); - } - } - snc_unlock(&lock); - } - arena_temp_end(temp); - } - } - - scratch_end(scratch); -#elif 0 - (UNUSED)_; - { - i32 priority = THREAD_PRIORITY_TIME_CRITICAL; - b32 success = SetThreadPriority(GetCurrentThread(), priority); - (UNUSED)success; - ASSERT(success); - } - - struct arena_temp scratch = scratch_begin_no_conflict(); - - /* Create high resolution timer */ - HANDLE timer = CreateWaitableTimerExW(NULL, NULL, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); - if (!timer) { - sys_panic(LIT("Failed to create high resolution timer")); - } - - /* Set high resolution timer period */ - { - LARGE_INTEGER due = ZI; - due.QuadPart = -1; - //i32 interval = 5; - //i32 interval = 1; - i32 interval = 1; - b32 success = false; - while (!success && interval <= 50) { - success = SetWaitableTimerEx(timer, &due, interval, NULL, NULL, NULL, 0); - ++interval; - } - if (!success) { - sys_panic(LIT("Failed to set scheduler timing period")); - } - } - - /* Create ring buffer of scheduler cycles initialized to default value */ - i32 periods_index = 0; - i64 periods[NUM_ROLLING_SCHEDULER_PERIODS] = ZI; - for (i32 i = 0; i < (i32)countof(periods); ++i) { - periods[i] = DEFAULT_SCHEDULER_CYCLE_PERIOD_NS; - } - - i64 last_cycle_ns = 0; - while (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { - __profn("Job scheduler cycle"); - { - __profn("Job scheduler wait"); - WaitForSingleObject(timer, INFINITE); - } - - i64 now_ns = sys_time_ns(); - if (last_cycle_ns != 0) { - i64 new_period_ns = now_ns - last_cycle_ns; - periods[periods_index++] = new_period_ns; - if (periods_index == countof(periods)) { - periods_index = 0; - } - /* Calculate mean period */ - f64 periods_sum_ns = 0; - for (i32 i = 0; i < (i32)countof(periods); ++i) { - periods_sum_ns += (f64)periods[i]; - } - f64 mean_ns = periods_sum_ns / (f64)countof(periods); - atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, math_round_to_int64(mean_ns)); - } - last_cycle_ns = now_ns; - - u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1); - i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle, 1) + 1; - { - __profn("Job scheduler run"); - struct arena_temp temp = arena_temp_begin(scratch.arena); - u64 wait_time_bin_index = (u64)current_cycle % NUM_WAIT_TIME_BINS; - struct wait_bin *wait_time_bin = &G.wait_time_bins[wait_time_bin_index]; - struct wait_list *wait_time_list = NULL; - - /* Build list of waiters to resume */ - i32 num_waiters = 0; - struct fiber **waiters = NULL; - { - while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause(); - { - /* Search for wait time list */ - for (struct wait_list *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)current_cycle) { - wait_time_list = tmp; - } - } - - if (wait_time_list) { - /* Set waiter wake status & build waiters list */ - waiters = arena_push_array_no_zero(temp.arena, struct fiber *, wait_time_list->num_waiters); - for (struct fiber *waiter = fiber_from_id(wait_time_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_time_waiter)) { - if (atomic_u64_fetch_test_set(&waiter->wake_gen, 0, wake_gen) == 0) { - waiters[num_waiters] = waiter; - ++num_waiters; - } - - } - } - } - atomic_i32_fetch_set(&wait_time_bin->lock, 0); - } - - /* Update wait lists */ - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - u64 wait_addr = waiter->wait_addr; - u64 wait_addr_bin_index = wait_addr % NUM_WAIT_ADDR_BINS; - struct wait_bin *wait_addr_bin = &G.wait_addr_bins[wait_addr_bin_index]; - - if (wait_addr != 0) while (atomic_i32_fetch_test_set(&wait_addr_bin->lock, 0, 1) != 0) ix_pause(); - { - /* Search for wait addr list */ - struct wait_list *wait_addr_list = NULL; - if (wait_addr != 0) { - for (struct wait_list *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { - if (tmp->value == (u64)wait_addr) { - wait_addr_list = tmp; - } - } - } - - while (atomic_i32_fetch_test_set(&wait_time_bin->lock, 0, 1) != 0) ix_pause(); - { - /* Remove from addr list */ - if (wait_addr_list) { - if (--wait_addr_list->num_waiters == 0) { - /* Free addr list */ - struct wait_list *prev = wait_addr_list->prev_in_bin; - struct wait_list *next = wait_addr_list->next_in_bin; - if (prev) { - prev->next_in_bin = next; - } else { - wait_addr_bin->first_wait_list = next; - } - if (next) { - next->next_in_bin = prev; - } else { - wait_addr_bin->last_wait_list = prev; - } - wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list; - wait_addr_bin->first_free_wait_list = wait_addr_list; - } else { - i16 prev_id = waiter->prev_addr_waiter; - i16 next_id = waiter->next_addr_waiter; - if (prev_id) { - fiber_from_id(prev_id)->next_addr_waiter = next_id; - } else { - wait_addr_list->first_waiter = next_id; - } - if (next_id) { - fiber_from_id(next_id)->prev_addr_waiter = prev_id; - } else { - wait_addr_list->last_waiter = prev_id; - } - } - waiter->wait_addr = 0; - waiter->prev_addr_waiter = 0; - waiter->next_addr_waiter = 0; - } - /* Remove from time list */ - { - if (--wait_time_list->num_waiters == 0) { - /* Free time list */ - struct wait_list *prev = wait_time_list->prev_in_bin; - struct wait_list *next = wait_time_list->next_in_bin; - if (prev) { - prev->next_in_bin = next; - } else { - wait_time_bin->first_wait_list = next; - } - if (next) { - next->next_in_bin = prev; - } else { - wait_time_bin->last_wait_list = prev; - } - wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list; - wait_time_bin->first_free_wait_list = wait_time_list; - } else { - i16 prev_id = waiter->prev_time_waiter; - i16 next_id = waiter->next_time_waiter; - if (prev_id) { - fiber_from_id(prev_id)->next_time_waiter = next_id; - } else { - wait_time_list->first_waiter = next_id; - } - if (next_id) { - fiber_from_id(next_id)->prev_time_waiter = prev_id; - } else { - wait_time_list->last_waiter = prev_id; - } - } - waiter->wait_time = 0; - waiter->prev_time_waiter = 0; - waiter->next_time_waiter = 0; - } - } - atomic_i32_fetch_set(&wait_time_bin->lock, 0); - } - if (wait_addr != 0) atomic_i32_fetch_set(&wait_addr_bin->lock, 0); - } - - /* Resume waiters */ - /* TODO: Batch submit waiters based on queue kind rather than one at a time */ - for (i32 i = 0; i < num_waiters; ++i) { - struct fiber *waiter = waiters[i]; - enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority); - struct job_queue *queue = &G.job_queues[queue_kind]; - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); - { - struct job_info *info = NULL; - if (queue->first_free) { - info = queue->first_free; - queue->first_free = info->next; - } else { - info = arena_push_no_zero(queue->arena, struct job_info); - } - MEMZERO_STRUCT(info); - info->count = 1; - info->num_dispatched = waiter->job_id; - info->func = waiter->job_func; - info->sig = waiter->job_sig; - info->counter = waiter->job_counter; - info->fiber_id = waiter->id; - if (queue->last) { - queue->last->next = info; - } else { - queue->first = info; - } - queue->last = info; - atomic_u64_fetch_set(&waiter->wake_gen, 0); - } - atomic_i32_fetch_set(&queue->lock, 0); - } - - /* Wake workers */ - /* TODO: Only wake necessary amount of workers */ - if (num_waiters > 0) { - struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); - { - atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters); - if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { - atomic_i64_fetch_add(&G.workers_wake_gen, 1); - snc_cv_broadcast(&G.workers_wake_cv); - } - } - snc_unlock(&lock); - } - arena_temp_end(temp); - } - } - - scratch_end(scratch); -#else (UNUSED)_; { i32 priority = THREAD_PRIORITY_TIME_CRITICAL; @@ -1639,7 +1169,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) } i64 last_cycle_ns = 0; - while (!atomic_i32_fetch(&G.workers_shutdown)) { + while (!atomic_i32_fetch(&G.workers_shutdown.v)) { __profn("Job scheduler cycle"); { __profn("Job scheduler wait"); @@ -1666,12 +1196,12 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) periods_sum_ns += (f64)periods[i]; } f64 mean_ns = periods_sum_ns / (f64)countof(periods); - atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, math_round_to_int64(mean_ns)); + atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns.v, math_round_to_int64(mean_ns)); } last_cycle_ns = now_ns; - u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen, 1); - i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle, 1) + 1; + u64 wake_gen = atomic_u64_fetch_add_u64(&G.waiter_wake_gen.v, 1); + i64 current_cycle = atomic_i64_fetch_add(&G.current_scheduler_cycle.v, 1) + 1; { __profn("Job scheduler run"); struct arena_temp temp = arena_temp_begin(scratch.arena); @@ -1812,7 +1342,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) struct fiber *waiter = waiters[i]; enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority); struct job_queue *queue = &G.job_queues[queue_kind]; - while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); + tm_lock(&queue->lock); { struct job_info *info = NULL; if (queue->first_free) { @@ -1836,7 +1366,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) queue->last = info; atomic_u64_fetch_set(&waiter->wake_gen, 0); } - atomic_i32_fetch_set(&queue->lock, 0); + tm_unlock(&queue->lock); } /* Wake workers */ @@ -1844,7 +1374,7 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) if (num_waiters > 0) { struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); { - atomic_i64_fetch_add(&G.num_jobs_in_queue, num_waiters); + atomic_i64_fetch_add(&G.num_jobs_in_queue.v, num_waiters); snc_cv_broadcast(&G.workers_wake_cv); } snc_unlock(&lock); @@ -1854,7 +1384,6 @@ INTERNAL SYS_THREAD_DEF(job_scheduler_entry, _) } scratch_end(scratch); -#endif } /* ========================== * @@ -1867,7 +1396,7 @@ INTERNAL SYS_THREAD_DEF(test_entry, _) (UNUSED)_; /* Start scheduler */ - atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns, DEFAULT_SCHEDULER_CYCLE_PERIOD_NS); + atomic_i64_fetch_set(&G.current_scheduler_cycle_period_ns.v, DEFAULT_SCHEDULER_CYCLE_PERIOD_NS); struct sys_thread *scheduler_thread = sys_thread_alloc(job_scheduler_entry, NULL, LIT("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER); /* Start workers */ @@ -3871,7 +3400,7 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, /* Shutdown test thread */ { struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); - atomic_i32_fetch_set(&G.workers_shutdown, 1); + atomic_i32_fetch_set(&G.workers_shutdown.v, 1); snc_cv_broadcast(&G.workers_wake_cv); snc_unlock(&lock); }