diff --git a/src/arena.c b/src/arena.c index 9ed6da6d..a277cd0d 100644 --- a/src/arena.c +++ b/src/arena.c @@ -8,7 +8,7 @@ /* NOTE: Application will exit if arena fails to reserve or commit initial memory. */ struct arena *arena_alloc(u64 reserve) { - //__prof; + __prof; reserve += ARENA_HEADER_SIZE; /* Round up to nearest block size */ diff --git a/src/playback_wasapi.c b/src/playback_wasapi.c index 15450d30..08c4ce6a 100644 --- a/src/playback_wasapi.c +++ b/src/playback_wasapi.c @@ -250,13 +250,13 @@ INTERNAL SYS_THREAD_DEF(playback_scheduler_entry, _) /* TODO: Signal counter that running job wiats on, rather than scheduling job manually */ while (!atomic_i32_fetch(&G.shutdown)) { { - __profn("Wait for audio event"); + __profn("Wasapi wait"); WaitForSingleObject(G.event, INFINITE); } { __profn("Run mix job & wait"); struct snc_counter counter = ZI; - sys_run(1, playback_mix_job, NULL, SYS_PRIORITY_HIGH, &counter); + sys_run(1, playback_mix_job, NULL, SYS_PRIORITY_CRITICAL, &counter); snc_counter_wait(&counter); } } diff --git a/src/snc.c b/src/snc.c index c6f1b912..d0026457 100644 --- a/src/snc.c +++ b/src/snc.c @@ -45,7 +45,7 @@ struct snc_lock snc_lock_spin_e(struct snc_mutex *m, i32 spin) if (spin_cnt < spin) { ix_pause(); } else { - sys_wait(&m->v, &v, 4, F32_INFINITY); + sys_wait(&m->v, &v, 4, I64_MAX); spin_cnt = 0; } } @@ -82,7 +82,7 @@ struct snc_lock snc_lock_spin_s(struct snc_mutex *m, i32 spin) if (spin_cnt < spin) { ix_pause(); } else { - sys_wait(&m->v, &v, 4, F32_INFINITY); + sys_wait(&m->v, &v, 4, I64_MAX); spin_cnt = 0; } } @@ -122,6 +122,11 @@ void snc_unlock(struct snc_lock *l) * ========================== */ void snc_cv_wait(struct snc_cv *cv, struct snc_lock *l) +{ + snc_cv_wait_time(cv, l, I64_MAX); +} + +void snc_cv_wait_time(struct snc_cv *cv, struct snc_lock *l, i64 timeout_ns) { u64 old_wake_gen = atomic_u64_fetch(&cv->wake_gen); struct snc_mutex *mutex = l->mutex; @@ -130,14 +135,14 @@ void snc_cv_wait(struct snc_cv *cv, struct snc_lock *l) { snc_unlock(l); do { - sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), F32_INFINITY); + sys_wait(&cv->wake_gen, &old_wake_gen, sizeof(old_wake_gen), timeout_ns); wake_gen = atomic_u64_fetch(&cv->wake_gen); } while (wake_gen == old_wake_gen); sys_wake_all(&cv->wake_gen); if (exclusive) { - *l= snc_lock_e(mutex); + *l = snc_lock_e(mutex); } else { - *l= snc_lock_s(mutex); + *l = snc_lock_s(mutex); } } } @@ -165,7 +170,7 @@ void snc_counter_wait(struct snc_counter *counter) { i64 v = atomic_i64_fetch(&counter->v); while (v > 0) { - sys_wait(&counter->v, &v, sizeof(v), F32_INFINITY); + sys_wait(&counter->v, &v, sizeof(v), I64_MAX); v = atomic_i64_fetch(&counter->v); } } diff --git a/src/snc.h b/src/snc.h index fa737d6b..a08f413f 100644 --- a/src/snc.h +++ b/src/snc.h @@ -11,6 +11,7 @@ struct snc_lock { }; struct snc_mutex { + /* Bit 31: exclusive lock held, bit 30: pending exclusive lock, bits 0-30: shared locks count */ struct atomic_u32 v; #if RTC @@ -41,6 +42,7 @@ struct snc_cv { }; void snc_cv_wait(struct snc_cv *cv, struct snc_lock *lock); +void snc_cv_wait_time(struct snc_cv *cv, struct snc_lock *l, i64 timeout_ns); void snc_cv_broadcast(struct snc_cv *cv); /* ========================== * diff --git a/src/sys.h b/src/sys.h index 6757daf3..f9107df4 100644 --- a/src/sys.h +++ b/src/sys.h @@ -449,7 +449,7 @@ b32 sys_run_command(struct string cmd); /* Futex-like wait & wake */ -void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds); +void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns); void sys_wake_single(void *addr); void sys_wake_all(void *addr); @@ -467,9 +467,10 @@ i16 sys_current_fiber_id(void); enum sys_priority { SYS_PRIORITY_INHERIT = -1, - SYS_PRIORITY_HIGH = 0, - SYS_PRIORITY_NORMAL = 1, - SYS_PRIORITY_BACKGROUND = 2, + SYS_PRIORITY_CRITICAL = 0, + SYS_PRIORITY_HIGH = 1, + SYS_PRIORITY_NORMAL = 2, + SYS_PRIORITY_BACKGROUND = 3, NUM_SYS_PRIORITIES }; diff --git a/src/sys_win32.c b/src/sys_win32.c index 1806e0e7..dc51d36f 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -101,73 +101,47 @@ struct win32_window { }; -#define NUM_WAIT_ADDR_BINS 4096 +#define NUM_WAIT_ADDR_BINS 65536 struct alignas(64) wait_list { /* =================================================== */ - void *addr; /* 8 bytes */ + u64 value; /* 08 bytes */ /* =================================================== */ - struct yielder *first_yielder; /* 8 bytes */ + i16 first_waiter; /* 02 bytes */ + i16 last_waiter; /* 02 bytes */ + i32 num_waiters; /* 04 bytes */ /* =================================================== */ - struct yielder *last_yielder; /* 8 bytes */ + struct wait_list *next_in_bin; /* 08 bytes */ /* =================================================== */ + struct wait_list *prev_in_bin; /* 08 bytes */ /* =================================================== */ - i32 num_yielders; /* 4 bytes */ - u8 _pad0[4]; /* 4 bytes (padding */ + u8 _pad0[8]; /* 08 bytes (padding) */ /* =================================================== */ - struct wait_list *next_in_bin; /* 8 bytes */ + u8 _pad1[8]; /* 08 bytes (padding) */ /* =================================================== */ - struct wait_list *prev_in_bin; /* 8 bytes */ + u8 _pad2[8]; /* 08 bytes (padding) */ /* =================================================== */ - u8 _pad1[8]; /* 8 bytes (padding) */ + u8 _pad3[8]; /* 08 bytes (padding) */ }; STATIC_ASSERT(sizeof(struct wait_list) == 64); /* Padding validation (increase if necessary) */ STATIC_ASSERT(alignof(struct wait_list) == 64); /* Avoid false sharing */ struct alignas(64) wait_bin { /* =================================================== */ - struct wait_list *first_wait_list; /* 8 bytes */ + struct wait_list *first_wait_list; /* 08 bytes */ /* =================================================== */ - struct wait_list *last_wait_list; /* 8 bytes */ + struct wait_list *last_wait_list; /* 08 bytes */ /* =================================================== */ - struct wait_list *first_free_wait_list; /* 8 bytes */ + struct wait_list *first_free_wait_list; /* 08 bytes */ /* =================================================== */ - struct yielder *first_free_yielder; + struct atomic_i32 lock; /* 04 bytes */ + u8 _pad0[4]; /* 04 bytes (padding) */ /* =================================================== */ - struct atomic_i32 lock; /* 4 bytes */ - u8 _pad0[4]; /* 4 bytes (padding) */ - /* =================================================== */ - u8 _pad1[24]; /* 24 bytes (padding) */ + u8 _pad1[32]; /* 32 bytes (padding) */ }; STATIC_ASSERT(sizeof(struct wait_bin) == 64); /* Padding validation (increase if necessary) */ STATIC_ASSERT(alignof(struct wait_bin) == 64); /* Avoid false sharing */ -struct alignas(64) yielder { - /* =================================================== */ - i16 fiber_id; /* 2 bytes */ - u8 _pad0[2]; /* 2 bytes (padding) */ - i32 job_queue_kind; /* 4 bytes */ - /* =================================================== */ - sys_job_func *job_func; /* 8 bytes */ - /* =================================================== */ - void *job_sig; /* 8 bytes */ - /* =================================================== */ - struct snc_counter *job_counter; /* 8 bytes */ - /* =================================================== */ - struct yielder *next; /* 8 bytes */ - /* =================================================== */ - struct yielder *prev; /* 8 bytes */ - /* =================================================== */ - i32 job_id; /* 4 bytes */ - u8 _pad1[4]; /* 4 bytes (padding) */ - /* =================================================== */ - u8 _pad2[8]; /* 8 bytes (padding) */ -}; -STATIC_ASSERT(sizeof(struct yielder) == 64); /* Padding validation (increase if necessary) */ -STATIC_ASSERT(alignof(struct yielder) == 64); /* Avoid false sharing */ - - - struct alignas(64) counter { /* =================================================== */ @@ -186,8 +160,8 @@ STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */ -#define FIBER_NAME_PREFIX_CSTR "[" -#define FIBER_NAME_SUFFIX_CSTR "] Fiber" +#define FIBER_NAME_PREFIX_CSTR "Fiber [" +#define FIBER_NAME_SUFFIX_CSTR "]" #define FIBER_NAME_MAX_SIZE 64 enum yield_kind { @@ -205,54 +179,57 @@ struct yield_param { void *addr; void *cmp; u32 size; + i64 timeout_ns; } wait; }; }; struct alignas(64) fiber { - /* =================================================== */ - char *name_cstr; /* 8 bytes */ - /* =================================================== */ - i16 id; /* 2 bytes */ - i16 parent_id; /* 2 bytes */ - u8 _pad0[4]; /* 4 bytes (padding) */ - /* =================================================== */ - void *addr; /* 8 bytes */ - /* =================================================== */ - sys_job_func *job_func; /* 8 bytes */ - /* =================================================== */ - void *job_sig; /* 8 bytes */ - /* =================================================== */ - i32 job_id; /* 4 bytes */ - i32 job_priority; /* 4 bytes */ - /* =================================================== */ - struct yield_param *yield_param; /* 8 bytes */ - /* =================================================== */ - u8 _pad1[8]; /* 8 bytes (padding) */ -}; -STATIC_ASSERT(sizeof(struct fiber) == 64); /* Padding validation (increase if necessary) */ -STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */ -STATIC_ASSERT(SYS_MAX_FIBERS < I16_MAX); /* Max fibers should fit in fiber id */ + /* ==================================================== */ + void *addr; /* 08 bytes */ + /* ==================================================== */ + char *name_cstr; /* 08 bytes */ + /* ==================================================== */ + i16 id; /* 02 bytes */ + i16 parent_id; /* 02 bytes */ + i16 next_waiter; /* 02 bytes */ + i16 prev_waiter; /* 02 bytes */ + /* ==================================================== */ + u8 _pad1[8]; /* 08 bytes (padding) */ + /* ==================================================== */ + u8 _pad2[8]; /* 08 bytes (padding) */ + /* ==================================================== */ + u8 _pad3[8]; /* 08 bytes (padding) */ + /* ==================================================== */ + u8 _pad4[8]; /* 08 bytes (padding) */ + /* ==================================================== */ + u8 _pad5[8]; /* 08 bytes (padding) */ + /* ==================================================== */ + + /* ==================================================== */ + /* =============== Cache line boundary ================ */ + /* ==================================================== */ -struct alignas(64) fiber_ctx { /* ==================================================== */ struct sys_scratch_ctx scratch_ctx; /* 16 bytes */ /* ==================================================== */ - u8 _pad0[8]; /* 8 bytes (padding) */ + sys_job_func *job_func; /* 08 bytes */ /* ==================================================== */ - u8 _pad1[8]; /* 8 bytes (padding) */ + void *job_sig; /* 08 bytes */ /* ==================================================== */ - u8 _pad2[8]; /* 8 bytes (padding) */ + i32 job_id; /* 04 bytes */ + i32 job_priority; /* 04 bytes */ /* ==================================================== */ - u8 _pad3[8]; /* 8 bytes (padding) */ + struct snc_counter *job_counter; /* 08 bytes */ /* ==================================================== */ - u8 _pad4[8]; /* 8 bytes (padding) */ + struct yield_param *yield_param; /* 08 bytes */ + /* ==================================================== */ + u8 _pad6[8]; /* 08 bytes (padding) */ + }; -STATIC_ASSERT(sizeof(struct fiber_ctx) == 64); /* Padding validation (increase if necessary) */ -STATIC_ASSERT(alignof(struct fiber_ctx) == 64); /* Avoid false sharing */ - - - +STATIC_ASSERT(sizeof(struct fiber) == 128); /* Padding validation (increase if necessary) */ +STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */ +STATIC_ASSERT(SYS_MAX_FIBERS < I16_MAX); /* Max fibers should fit in fiber id */ struct alignas(64) worker_ctx { @@ -273,6 +250,7 @@ struct job_info { }; enum job_queue_kind { + JOB_QUEUE_KIND_CRITICAL_PRIORITY, JOB_QUEUE_KIND_HIGH_PRIORITY, JOB_QUEUE_KIND_NORMAL_PRIORITY, JOB_QUEUE_KIND_BACKGROUND, @@ -332,18 +310,12 @@ GLOBAL struct { struct arena *windows_arena; struct win32_window *first_free_window; - - - /* Yielders */ - struct atomic_i32 yielders_arena_lock; /* TODO: Prevent false sharing */ - struct arena *yielders_arena; - /* Wait lists */ struct atomic_i32 wait_lists_arena_lock; /* TODO: Prevent false sharing */ struct arena *wait_lists_arena; /* Wait table */ - struct wait_bin wait_bins[NUM_WAIT_ADDR_BINS]; + struct wait_bin wait_addr_bins[NUM_WAIT_ADDR_BINS]; /* Fibers */ i16 num_fibers; @@ -351,7 +323,6 @@ GLOBAL struct { struct arena *fiber_names_arena; struct atomic_i32 fibers_lock; /* TODO: Prevent false sharing */ struct fiber fibers[SYS_MAX_FIBERS]; - struct fiber_ctx fiber_contexts[SYS_MAX_FIBERS]; /* Jobs */ struct job_queue job_queues[NUM_JOB_QUEUE_KINDS]; @@ -373,6 +344,8 @@ GLOBAL struct { INTERNAL struct fiber *fiber_from_id(i16 id); INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber); +INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority); +INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind); @@ -393,27 +366,29 @@ INTERNAL void job_fiber_yield(struct fiber *fiber, struct fiber *parent_fiber); * Wait / wake * ========================== */ -void sys_wait(void *addr, void *cmp, u32 size, f32 timeout_seconds) +void sys_wait(void *addr, void *cmp, u32 size, i64 timeout_ns) { struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); i16 parent_fiber_id = fiber->parent_id; - /* Yield if job fiber, otherwise fall back to windows blocking function */ if (parent_fiber_id > 0) { - /* FIXME: Implement fiber timeout */ *fiber->yield_param = (struct yield_param) { .kind = YIELD_KIND_WAIT, .wait = { .addr = addr, .cmp = cmp, - .size = size + .size = size, + .timeout_ns = timeout_ns } }; struct fiber *parent_fiber = fiber_from_id(parent_fiber_id); job_fiber_yield(fiber, parent_fiber); } else { - i32 timeout_ms = INFINITE; - if (timeout_seconds != F32_INFINITY) { - timeout_ms = (i32)(timeout_seconds * 1000); + i32 timeout_ms = 0; + if (timeout_ns == I64_MAX) { + timeout_ms = INFINITE; + } else if (timeout_ns != 0) { + timeout_ms = timeout_ns / 1000000; + timeout_ms += (timeout_ms == 0) * math_fsign(timeout_ns); } WaitOnAddress(addr, cmp, size, timeout_ms); } @@ -428,45 +403,45 @@ void sys_wake_single(void *addr) void sys_wake_all(void *addr) { u64 wait_bin_index = (u64)addr % NUM_WAIT_ADDR_BINS; - struct wait_bin *bin = &G.wait_bins[wait_bin_index]; + struct wait_bin *bin = &G.wait_addr_bins[wait_bin_index]; - i32 num_yielders = 0; + i32 num_waiters = 0; while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); { struct wait_list *wait_list = NULL; for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) { - if (tmp->addr == addr) { + if (tmp->value == (u64)addr) { wait_list = tmp; } } - if (wait_list && wait_list->num_yielders > 0) { - num_yielders = wait_list->num_yielders; + if (wait_list && wait_list->num_waiters > 0) { + num_waiters = wait_list->num_waiters; struct arena_temp scratch = scratch_begin_no_conflict(); { - /* Separate yielders by queue kind */ + /* Separate waiters by queue kind */ i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI; - struct yielder **queue_yielder_arrays[NUM_JOB_QUEUE_KINDS] = ZI; - for (i32 i = 0; i < (i32)countof(queue_yielder_arrays); ++i) { - /* NOTE: Each array is conservatively sized as the number of all yielders in the list */ - queue_yielder_arrays[i] = arena_push_array_no_zero(scratch.arena, struct yielder *, num_yielders); + struct fiber **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI; + for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) { + /* NOTE: Each array is conservatively sized as the number of all waiters in the list */ + queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct fiber *, num_waiters); } - for (struct yielder *yielder = wait_list->first_yielder; yielder; yielder = yielder->next) { - enum job_queue_kind queue_kind = yielder->job_queue_kind; + for (struct fiber *waiter = fiber_from_id(wait_list->first_waiter); waiter; waiter = fiber_from_id(waiter->next_waiter)) { + enum job_queue_kind queue_kind = job_queue_kind_from_priority(waiter->job_priority); i32 index = queue_counts[queue_kind]++; - struct yielder **array = queue_yielder_arrays[queue_kind]; - array[index] = yielder; + struct fiber **array = queue_waiter_arrays[queue_kind]; + array[index] = waiter; } /* Push jobs */ for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) { i32 num_jobs = queue_counts[queue_kind]; if (num_jobs > 0) { struct job_queue *queue = &G.job_queues[queue_kind]; - struct yielder **queue_yielders = queue_yielder_arrays[queue_kind]; + struct fiber **queue_waiters = queue_waiter_arrays[queue_kind]; while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); { /* TODO: More efficient batch job list allocation */ for (i32 i = 0; i < num_jobs; ++i) { - struct yielder *yielder = queue_yielders[i]; + struct fiber *waiter = queue_waiters[i]; struct job_info *info = NULL; if (queue->first_free) { info = queue->first_free; @@ -476,11 +451,11 @@ void sys_wake_all(void *addr) } MEMZERO_STRUCT(info); info->count = 1; - info->num_dispatched = yielder->job_id; - info->func = yielder->job_func; - info->sig = yielder->job_sig; - info->counter = yielder->job_counter; - info->fiber_id = yielder->fiber_id; + info->num_dispatched = waiter->job_id; + info->func = waiter->job_func; + info->sig = waiter->job_sig; + info->counter = waiter->job_counter; + info->fiber_id = waiter->id; if (queue->last) { queue->last->next = info; } else { @@ -492,12 +467,23 @@ void sys_wake_all(void *addr) atomic_i32_fetch_set(&queue->lock, 0); } } - /* Free yielders */ - wait_list->last_yielder->next = bin->first_free_yielder; - bin->first_free_yielder = wait_list->first_yielder; - wait_list->first_yielder = NULL; - wait_list->last_yielder = NULL; - wait_list->num_yielders = 0; + /* Free wait list */ + { + struct wait_list *prev = wait_list->prev_in_bin; + struct wait_list *next = wait_list->next_in_bin; + if (prev) { + prev->next_in_bin = next; + } else { + bin->first_wait_list = next; + } + if (next) { + next->prev_in_bin = prev; + } else { + bin->last_wait_list = prev; + } + wait_list->next_in_bin = bin->first_free_wait_list; + bin->first_free_wait_list = wait_list; + } } scratch_end(scratch); } @@ -509,7 +495,7 @@ void sys_wake_all(void *addr) /* Wake workers */ /* TODO: Only wake necessary amount of workers */ - if (num_yielders > 0) { + if (num_waiters > 0) { struct snc_lock lock = snc_lock_e(&G.workers_wake_mutex); { if (atomic_i64_fetch(&G.workers_wake_gen) >= 0) { @@ -601,6 +587,7 @@ INTERNAL struct fiber *fiber_alloc(enum fiber_kind kind) fiber->job_sig = 0; fiber->job_id = 0; fiber->job_priority = 0; + fiber->job_counter = 0; fiber->yield_param = 0; fiber->parent_id = 0; return fiber; @@ -616,16 +603,13 @@ INTERNAL void fiber_release(struct fiber *fiber, i16 fiber_id) atomic_i32_fetch_set(&G.fibers_lock, 0); } -INTERNAL struct fiber *fiber_from_id(i16 id) +FORCE_INLINE struct fiber *fiber_from_id(i16 id) { - ASSERT(id >= 0 && id < SYS_MAX_FIBERS); - return &G.fibers[id]; -} - -INTERNAL struct fiber_ctx *fiber_ctx_from_id(i16 id) -{ - ASSERT(id >= 0 && id < SYS_MAX_FIBERS); - return &G.fiber_contexts[id]; + if (id <= 0) { + return NULL; + } else { + return &G.fibers[id]; + } } /* ========================== * @@ -645,8 +629,7 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit } struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */ - STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ - enum job_queue_kind queue_kind = (enum job_queue_kind)priority; + enum job_queue_kind queue_kind = job_queue_kind_from_priority(priority); struct job_queue *queue = &G.job_queues[queue_kind]; while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); { @@ -746,6 +729,18 @@ INTERNAL void job_fiber_entry(void *id_ptr) * Job worker thread * ========================== */ +INTERNAL enum job_queue_kind job_queue_kind_from_priority(enum sys_priority priority) +{ + STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ + return (enum job_queue_kind)priority; +} + +INTERNAL enum sys_priority job_priority_from_queue_kind(enum job_queue_kind queue_kind) +{ + STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ + return (enum sys_priority)queue_kind; +} + INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) { struct worker_ctx *ctx = worker_ctx_arg; @@ -784,8 +779,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) /* Pull job from queue */ b32 queues_empty = true; enum sys_priority job_priority = 0; - enum job_queue_kind job_queue_kind = 0; - i32 job_fiber_id = 0; + i16 job_fiber_id = 0; i32 job_id = 0; sys_job_func *job_func = 0; void *job_sig = 0; @@ -799,7 +793,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) { struct job_info *info = queue->first; job_priority = (enum sys_priority)queue->kind; - job_queue_kind = queue->kind; + job_priority = job_priority_from_queue_kind(queue->kind); while (info && !job_func) { struct job_info *next = info->next; b32 dequeue = false; @@ -807,7 +801,6 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) job_id = info->num_dispatched++; if (job_id < info->count) { /* Pick job */ - STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ job_func = info->func; job_sig = info->sig; job_counter = info->counter; @@ -859,6 +852,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) if (job_func) { if (!job_fiber) { job_fiber = fiber_alloc(FIBER_KIND_JOB_WORKER); + job_fiber_id = job_fiber->id; } { __profnc("Run fiber", RGB32_F(0.25, 0.75, 0)); @@ -868,6 +862,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) job_fiber->job_sig = job_sig; job_fiber->job_id = job_id; job_fiber->job_priority = job_priority; + job_fiber->job_counter = job_counter; job_fiber->parent_id = worker_fiber_id; job_fiber->yield_param = &yield; b32 done = false; @@ -884,13 +879,12 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) case YIELD_KIND_WAIT: { - #if 1 void *wait_addr = yield.wait.addr; void *wait_cmp = yield.wait.cmp; u32 wait_size = yield.wait.size; u64 wait_bin_index = (u64)wait_addr % NUM_WAIT_ADDR_BINS; - struct wait_bin *bin = &G.wait_bins[wait_bin_index]; + struct wait_bin *bin = &G.wait_addr_bins[wait_bin_index]; while (atomic_i32_fetch_test_set(&bin->lock, 0, 1) != 0) ix_pause(); { @@ -898,7 +892,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) /* Search addr wait list in bin */ struct wait_list *wait_list = NULL; for (struct wait_list *tmp = bin->first_wait_list; tmp && !wait_list; tmp = tmp->next_in_bin) { - if (tmp->addr == wait_addr) { + if (tmp->value == (u64)wait_addr) { wait_list = tmp; } } @@ -916,7 +910,7 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) atomic_i32_fetch_set(&G.wait_lists_arena_lock, 0); } MEMZERO_STRUCT(wait_list); - wait_list->addr = wait_addr; + wait_list->value = wait_addr; if (bin->last_wait_list) { bin->last_wait_list->next_in_bin = wait_list; wait_list->prev_in_bin = bin->last_wait_list; @@ -926,33 +920,15 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) bin->last_wait_list = wait_list; } - /* Allocate new yielder */ - struct yielder *yielder = NULL; - if (bin->first_free_yielder) { - yielder = bin->first_free_yielder; - bin->first_free_yielder = yielder->next; + /* Insert fiber into wait list */ + if (wait_list->last_waiter) { + fiber_from_id(wait_list->last_waiter)->next_waiter = job_fiber_id; + job_fiber->prev_waiter = wait_list->last_waiter; } else { - while (atomic_i32_fetch_test_set(&G.yielders_arena_lock, 0, 1) != 0) ix_pause(); - { - yielder = arena_push_no_zero(G.yielders_arena, struct yielder); - } - atomic_i32_fetch_set(&G.yielders_arena_lock, 0); + wait_list->first_waiter = job_fiber_id; } - MEMZERO_STRUCT(yielder); - yielder->fiber_id = job_fiber->id; - yielder->job_queue_kind = job_queue_kind; - yielder->job_func = job_func; - yielder->job_sig = job_sig; - yielder->job_counter = job_counter; - yielder->job_id = job_id; - if (wait_list->last_yielder) { - wait_list->last_yielder->next = yielder; - yielder->prev = wait_list->last_yielder; - } else { - wait_list->first_yielder = yielder; - } - wait_list->last_yielder = yielder; - ++wait_list->num_yielders; + wait_list->last_waiter = job_fiber_id; + ++wait_list->num_waiters; /* Pop worker's job fiber */ job_fiber = NULL; @@ -960,10 +936,6 @@ INTERNAL SYS_THREAD_DEF(job_worker_entry, worker_ctx_arg) } } atomic_i32_fetch_set(&bin->lock, 0); - #else - (UNUSED)job_queue_kind; - //ASSERT(false); - #endif } break; case YIELD_KIND_DONE: @@ -1068,8 +1040,8 @@ INTERNAL SYS_THREAD_DEF(test_entry, _) struct sys_scratch_ctx *sys_scratch_ctx_from_fiber_id(i16 id) { - struct fiber_ctx *fiber_ctx = fiber_ctx_from_id(id); - struct sys_scratch_ctx *scratch_ctx = &fiber_ctx->scratch_ctx; + struct fiber *fiber = fiber_from_id(id); + struct sys_scratch_ctx *scratch_ctx = &fiber->scratch_ctx; if (!scratch_ctx->arenas[0]) { __profn("Initialize scratch context"); for (u32 i = 0; i < countof(scratch_ctx->arenas); ++i) { @@ -3002,9 +2974,6 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, __profthread("Main thread", PROF_THREAD_GROUP_MAIN); - /* Init yielders */ - G.yielders_arena = arena_alloc(GIBI(64)); - /* Init wait lists */ G.wait_lists_arena = arena_alloc(GIBI(64));