diff --git a/src/sys.h b/src/sys.h index 209581a1..0ba274d8 100644 --- a/src/sys.h +++ b/src/sys.h @@ -508,7 +508,7 @@ enum sys_priority { SYS_PRIORITY_NORMAL = 1, SYS_PRIORITY_BACKGROUND = 2, - NUM_SYS_JOB_PRIORITIES + NUM_SYS_PRIORITIES }; struct sys_job_data { diff --git a/src/sys_win32.c b/src/sys_win32.c index 3aa2eb45..30d23dcc 100644 --- a/src/sys_win32.c +++ b/src/sys_win32.c @@ -111,10 +111,28 @@ struct win32_window { struct win32_window *next_free; }; - - - - +struct alignas(64) waiter { + /* =================================================== */ + i32 fiber_id; /* 4 bytes */ + i32 job_queue_kind; /* 4 bytes */ + /* =================================================== */ + sys_job_func *job_func; /* 8 bytes */ + /* =================================================== */ + void *job_sig; /* 8 bytes */ + /* =================================================== */ + struct counter *job_counter; /* 8 bytes */ + /* =================================================== */ + struct waiter *next; /* 8 bytes */ + /* =================================================== */ + i32 job_id; /* 4 bytes */ + u8 _pad0[4]; /* 4 bytes (padding) */ + /* =================================================== */ + u8 _pad1[8]; /* 8 bytes (padding) */ + /* =================================================== */ + u8 _pad2[8]; /* 8 bytes (padding) */ +}; +STATIC_ASSERT(sizeof(struct waiter) == 64); /* Assume waiter fits in one cache line (increase if necessary) */ +STATIC_ASSERT(alignof(struct waiter) == 64); /* Avoid false sharing */ @@ -125,17 +143,18 @@ struct alignas(64) counter { /* =================================================== */ struct counter *next_free; /* 8 bytes */ /* =================================================== */ - u8 _pad0[8]; /* 8 bytes (padding) */ + struct atomic_i32 waiters_lock; /* 4 bytes */ + i32 num_waiters; /* 4 bytes */ + /* =================================================== */ + struct waiter *first_waiter; /* 8 bytes */ + /* =================================================== */ + struct waiter *last_waiter; /* 8 bytes */ /* =================================================== */ u8 _pad1[8]; /* 8 bytes (padding) */ /* =================================================== */ u8 _pad2[8]; /* 8 bytes (padding) */ /* =================================================== */ u8 _pad3[8]; /* 8 bytes (padding) */ - /* =================================================== */ - u8 _pad4[8]; /* 8 bytes (padding) */ - /* =================================================== */ - u8 _pad5[8]; /* 8 bytes (padding) */ }; STATIC_ASSERT(sizeof(struct counter) == 64); /* Assume counter fits in one cache line (increase if necessary) */ STATIC_ASSERT(alignof(struct counter) == 64); /* Avoid false sharing */ @@ -154,7 +173,7 @@ enum yield_kind { YIELD_KIND_NONE, YIELD_KIND_DONE, YIELD_KIND_COOPERATIVE, - YIELD_KIND_SLEEP, + YIELD_KIND_WAIT, NUM_YIELD_KINDS }; @@ -175,10 +194,10 @@ struct alignas(64) fiber { i32 job_id; /* 4 bytes */ i32 job_priority; /* 4 bytes */ /* =================================================== */ + void *yield_arg; /* 8 bytes */ + /* =================================================== */ enum yield_kind yield_kind; /* 4 bytes */ u8 _pad0[4]; /* 4 bytes (padding) */ - /* =================================================== */ - u8 _pad1[8]; /* 8 bytes (padding) */ }; STATIC_ASSERT(sizeof(struct fiber) == 64); /* Assume fiber fits in one cache line (increase if necessary) */ STATIC_ASSERT(alignof(struct fiber) == 64); /* Avoid false sharing */ @@ -217,6 +236,8 @@ struct job_info { void *sig; struct counter *counter; + i32 fiber_id; /* If the job is being resumed from a yield */ + struct job_info *next; }; @@ -296,7 +317,10 @@ GLOBAL struct { - + /* Waiters */ + struct arena *waiters_arena; + struct atomic_i32 waiters_lock; /* TODO: Prevent false sharing */ + struct waiter *first_free_waiter; /* Counters */ struct arena *counters_arena; @@ -324,12 +348,12 @@ GLOBAL struct { } G = ZI, DEBUG_ALIAS(G, G_sys_win32); - - /* ========================== * * Counters * ========================== */ +INTERNAL void yield(enum yield_kind kind, void *arg); + INTERNAL struct counter *counter_alloc(void) { struct counter *counter = NULL; @@ -351,6 +375,7 @@ INTERNAL struct counter *counter_alloc(void) INTERNAL void counter_release(struct counter *counter) { + ASSERT(counter->first_waiter == NULL); /* Counter should not have any waiters */ while (atomic_i32_fetch_test_set(&G.counters_lock, 0, 1) != 0) ix_pause(); { counter->next_free = G.first_free_counter; @@ -364,19 +389,105 @@ INTERNAL void counter_add(struct counter *counter, i64 amount) i64 old_v = atomic_i64_fetch_add(&counter->v, amount); i64 new_v = old_v + amount; if (old_v > 0 && new_v <= 0) { - /* TODO: Wake waiters */ - (UNUSED)old_v; - (UNUSED)new_v; + /* Wake waiters */ + struct waiter *first_waiter = NULL; + struct waiter *last_waiter = NULL; + { + while (atomic_i32_fetch_test_set(&counter->waiters_lock, 0, 1) != 0) ix_pause(); + { + first_waiter = counter->first_waiter; + last_waiter = counter->last_waiter; + if (first_waiter) { + struct arena_temp scratch = scratch_begin_no_conflict(); + { + /* Separate waiters by queue kind */ + i32 queue_counts[NUM_JOB_QUEUE_KINDS] = ZI; + struct waiter **queue_waiter_arrays[NUM_JOB_QUEUE_KINDS] = ZI; + for (i32 i = 0; i < (i32)countof(queue_waiter_arrays); ++i) { + queue_waiter_arrays[i] = arena_push_array_no_zero(scratch.arena, struct waiter *, counter->num_waiters); + } + for (struct waiter *waiter = first_waiter; waiter; waiter = waiter->next) { + enum job_queue_kind queue_kind = waiter->job_queue_kind; + i32 index = queue_counts[queue_kind]++; + struct waiter **array = queue_waiter_arrays[index]; + array[index] = waiter; + } + + /* Push jobs */ + for (i32 queue_kind = 0; queue_kind < (i32)countof(queue_counts); ++queue_kind) { + i32 num_jobs = queue_counts[queue_kind]; + if (num_jobs > 0) { + struct job_queue *queue = &G.job_queues[queue_kind]; + struct waiter **queue_waiters = queue_waiter_arrays[queue_kind]; + while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); + { + /* TODO: More efficient batch job list allocation */ + for (i32 i = 0; i < num_jobs; ++i) { + struct waiter *waiter = queue_waiters[i]; + struct job_info *info = NULL; + if (queue->first_free) { + info = queue->first_free; + queue->first_free = info->next; + } else { + info = arena_push_no_zero(queue->arena, struct job_info); + } + MEMZERO_STRUCT(info); + info->count = 1; + info->num_dispatched = waiter->job_id; + info->func = waiter->job_func; + info->sig = waiter->job_sig; + info->counter = waiter->job_counter; + info->fiber_id = waiter->fiber_id; + if (queue->last) { + queue->last->next = info; + } else { + queue->first = info; + } + queue->last = info; + } + } + atomic_i32_fetch_set(&queue->lock, 0); + } + } + + /* Reset waiters list */ + counter->num_waiters = 0; + counter->first_waiter = NULL; + counter->last_waiter = NULL; + } + scratch_end(scratch); + } + counter->num_waiters = 0; + } + atomic_i32_fetch_set(&counter->waiters_lock, 0); + } + + /* Add waiters to free list */ + if (first_waiter) { + while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause(); + { + last_waiter->next = G.first_free_waiter; + G.first_free_waiter = first_waiter; + } + atomic_i32_fetch_set(&G.waiters_lock, 0); + } } } INTERNAL void counter_wait(struct counter *counter) { __prof; - /* TODO: Yield with configurable spin count */ - while (atomic_i64_fetch(&counter->v) > 0) { +#if 0 + /* TODO: Spin with configurable count */ + if (atomic_i64_fetch(&counter->v) > 0) { + /* Yield */ + yield(YIELD_KIND_WAIT, counter); + } +#else + while (atomic_i64_fetch(&counter->v) != 0) { ix_pause(); } +#endif } struct sys_counter *sys_counter_alloc(void) { return (struct sys_counter *)counter_alloc(); } @@ -497,7 +608,7 @@ i32 sys_current_fiber_id(void) return (i32)(i64)GetFiberData(); } -INTERNAL void yield(enum yield_kind kind) +INTERNAL void yield(enum yield_kind kind, void *arg) { struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); i32 parent_fiber_id = fiber->parent_id; @@ -508,6 +619,7 @@ INTERNAL void yield(enum yield_kind kind) { __prof_fiber_leave; fiber->yield_kind = kind; + fiber->yield_arg = arg; SwitchToFiber(parent_fiber->addr); __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS + fiber->id); } @@ -515,7 +627,7 @@ INTERNAL void yield(enum yield_kind kind) void sys_yield(void) { - yield(YIELD_KIND_COOPERATIVE); + yield(YIELD_KIND_COOPERATIVE, NULL); } void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priority, struct sys_counter *counter) @@ -527,7 +639,7 @@ void sys_run(i32 count, sys_job_func *func, void *sig, enum sys_priority priorit } struct fiber *fiber = fiber_from_id(sys_current_fiber_id()); priority = clamp_i32(priority, fiber->job_priority, SYS_PRIORITY_BACKGROUND); /* A job cannot create a job with a higher priority than itself */ - STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ + STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ enum job_queue_kind queue_kind = (enum job_queue_kind)priority; struct job_queue *queue = &G.job_queues[queue_kind]; while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); @@ -572,6 +684,7 @@ INTERNAL void job_fiber_entry(void *id_ptr) { __profscope(Run job); fiber->yield_kind = YIELD_KIND_NONE; + fiber->yield_arg = NULL; struct sys_job_data data = ZI; data.id = fiber->job_id; data.sig = fiber->job_sig; @@ -604,6 +717,7 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) while (!atomic_i32_fetch(&G.workers_shutdown)) { /* Pull job from queue */ enum sys_priority job_priority = 0; + enum job_queue_kind job_queue_kind = 0; i32 job_id = 0; sys_job_func *job_func = 0; void *job_sig = 0; @@ -614,19 +728,41 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) struct job_queue *queue = queues[queue_index]; if (queue) { while (atomic_i32_fetch_test_set(&queue->lock, 0, 1) != 0) ix_pause(); - struct job_info *info = queue->first; - while (info && !job_func) { - struct job_info *next = info->next; - job_id = info->num_dispatched++; - if (job_id < info->count) { - /* Pick job */ - STATIC_ASSERT((i32)NUM_SYS_JOB_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ - job_priority = (enum sys_priority)queue->kind; - job_func = info->func; - job_sig = info->sig; - job_counter = info->counter; - if (job_id == (info->count - 1)) { - /* We're picking up the last dispatch, so dequeue the job */ + { + struct job_info *info = queue->first; + job_priority = (enum sys_priority)queue->kind; + job_queue_kind = queue->kind; + while (info && !job_func) { + struct job_info *next = info->next; + b32 dequeue = false; + if (info->fiber_id <= 0) { + job_id = info->num_dispatched++; + if (job_id < info->count) { + /* Pick job */ + STATIC_ASSERT((i32)NUM_SYS_PRIORITIES == (i32)NUM_JOB_QUEUE_KINDS); /* Priority & queue kind enums must have 1:1 mapping */ + job_func = info->func; + job_sig = info->sig; + job_counter = info->counter; + if (info->fiber_id > 0) { + } + if (job_id == (info->count - 1)) { + /* We're picking up the last dispatch, so dequeue the job */ + dequeue = true; + } + } + } else { + /* This job is being resumed from a yield */ + if (job_fiber) { + fiber_release(job_fiber, job_fiber->id); + } + job_fiber = fiber_from_id(info->fiber_id); + job_id = info->num_dispatched; + job_func = info->func; + job_sig = info->sig; + job_counter = info->counter; + dequeue = true; + } + if (dequeue) { if (!next) { queue->last = NULL; } @@ -634,8 +770,8 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) info->next = queue->first_free; queue->first_free = info; } + info = next; } - info = next; } atomic_i32_fetch_set(&queue->lock, 0); } @@ -664,10 +800,51 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) sys_panic(LIT("Fiber yielded with unknown yield kind")); } break; - case YIELD_KIND_SLEEP: + case YIELD_KIND_WAIT: { - __profscope(Job sleep); - Sleep(100); + struct counter *wait_counter = job_fiber->yield_arg; + { + while (atomic_i32_fetch_test_set(&wait_counter->waiters_lock, 0, 1) != 0) ix_pause(); + { + if (atomic_i64_fetch(&wait_counter->v) > 0) { /* Re-check counter now that it's locked */ + /* Allocate waiter */ + struct waiter *waiter = NULL; + { + while (atomic_i32_fetch_test_set(&G.waiters_lock, 0, 1) != 0) ix_pause(); + { + if (G.first_free_waiter) { + waiter = G.first_free_waiter; + G.first_free_waiter = waiter->next; + } else { + waiter = arena_push_no_zero(G.waiters_arena, struct waiter); + } + } + atomic_i32_fetch_set(&G.waiters_lock, 0); + } + MEMZERO_STRUCT(waiter); + waiter->fiber_id = job_fiber->id; + waiter->job_queue_kind = job_queue_kind; + waiter->job_func = job_func; + waiter->job_sig = job_sig; + waiter->job_counter = job_counter; + waiter->job_id = job_id; + + /* Insert waiter */ + if (wait_counter->last_waiter) { + wait_counter->last_waiter->next = waiter; + } else { + wait_counter->first_waiter = waiter; + } + wait_counter->last_waiter = waiter; + ++wait_counter->num_waiters; + + /* Pop worker's job fiber */ + job_fiber = NULL; + done = true; + } + } + atomic_i32_fetch_set(&wait_counter->waiters_lock, 0); + } } break; case YIELD_KIND_DONE: @@ -675,9 +852,6 @@ INTERNAL SYS_THREAD_DEF(worker_entry, worker_ctx_arg) if (job_counter) { counter_add(job_counter, -1); } - - /* TODO: remove this */ - (UNUSED)fiber_release; done = true; } break; } @@ -2950,6 +3124,9 @@ int CALLBACK wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev_instance, __profthread("Main thread", PROF_THREAD_GROUP_MAIN); + /* Init waiters */ + G.waiters_arena = arena_alloc(GIGABYTE(64)); + /* Init counters */ G.counters_arena = arena_alloc(GIGABYTE(64));