W32_SharedJobCtx W32_shared_job_ctx = ZI; //////////////////////////////// //~ @hookdef Startup void StartupBaseJobs(void) { W32_SharedJobCtx *g = &W32_shared_job_ctx; /* Init timer */ { LARGE_INTEGER qpf; QueryPerformanceFrequency(&qpf); g->ns_per_qpc = 1000000000 / qpf.QuadPart; } { LARGE_INTEGER qpc; QueryPerformanceCounter(&qpc); g->timer_start_qpc = qpc.QuadPart; } /* Init fibers */ g->num_fibers = 1; /* Fiber at index 0 always nil */ g->fiber_names_arena = AcquireArena(Gibi(64)); /* Init job descs */ g->job_descs_arena = AcquireArena(Gibi(64)); /* Convert main thread to fiber */ W32_AcquireFiber(0); /* Init wait lists */ g->wait_lists_arena = AcquireArena(Gibi(64)); /* Init job pools */ for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind) { W32_JobPool *pool = &g->job_pools[pool_kind]; /* Init queues */ for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues); ++priority) { W32_JobQueue *queue = &pool->job_queues[priority]; queue->arena = AcquireArena(Gibi(64)); } } /* Init threads pool */ g->threads_arena = AcquireArena(Gibi(64)); /* Start job scheduler */ Atomic64FetchSet(&g->current_scheduler_cycle_period_ns.v, W32_DefaultSchedulerPeriodNs); W32_Thread *scheduler_thread = W32_AcquireThread(W32_JobSchedulerEntryFunc, 0, Lit("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER); LAX scheduler_thread; //- Start job workers /* TODO: Heuristic worker counts & affinities */ { __profn("Start job workers"); for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind) { W32_JobPool *pool = &g->job_pools[pool_kind]; String name_fmt = ZI; i32 prof_group = PROF_THREAD_GROUP_FIBERS - Mebi(pool_kind); switch (pool_kind) { default: Assert(0); break; case JobPool_Sim: { name_fmt = Lit("Sim worker #%F"); pool->num_worker_threads = 4; pool->thread_affinity_mask = 0x000000000000000Full; pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL; } break; case JobPool_User: { name_fmt = Lit("User worker #%F"); pool->num_worker_threads = 4; pool->thread_affinity_mask = 0x00000000000000F0ull; pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL; } break; case JobPool_Audio: { name_fmt = Lit("Audio worker #%F"); pool->num_worker_threads = 2; pool->thread_affinity_mask = 0x0000000000000300ull; pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL; pool->thread_is_audio = 1; } break; case JobPool_Background: { name_fmt = Lit("Background worker #%F"); pool->num_worker_threads = 2; pool->thread_affinity_mask = 0x0000000000000C00ull; } break; case JobPool_Floating: { name_fmt = Lit("Floating worker #%F"); pool->num_worker_threads = 8; pool->thread_affinity_mask = 0x0000000000000FFFull; } break; } pool->worker_threads_arena = AcquireArena(Gibi(64)); pool->worker_threads = PushStructs(pool->worker_threads_arena, W32_Thread *, pool->num_worker_threads); pool->worker_contexts = PushStructs(pool->worker_threads_arena, W32_WorkerCtx, pool->num_worker_threads); for (i32 i = 0; i < pool->num_worker_threads; ++i) { W32_WorkerCtx *ctx = &pool->worker_contexts[i]; ctx->pool_kind = pool_kind; ctx->id = i; String name = StringFormat(pool->worker_threads_arena, name_fmt, FmtSint(i)); pool->worker_threads[i] = W32_AcquireThread(W32_JobWorkerEntryFunc, ctx, name, prof_group + i); } } } //OnExit(ShutdownJobs); } //////////////////////////////// //~ Shutdown #if 0 void ShutdownJobs(void) { /* Signal shutdown */ if (!Atomic32Fetch(&g->panicking)) { Atomic32FetchSet(&g->shutdown, 1); for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind) { W32_JobPool *pool = &g->job_pools[pool_kind]; LockTicketMutex(&pool->workers_wake_tm); { Atomic32FetchSet(&pool->workers_shutdown.v, 1); Atomic64FetchSet(&pool->num_jobs_in_queue.v, -100000); WakeByAddressAll(&pool->num_jobs_in_queue); } UnlockTicketMutex(&pool->workers_wake_tm); } } /* Wait on worker threads */ if (!Atomic32Fetch(&g->panicking)) { for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind) { W32_JobPool *pool = &g->job_pools[pool_kind]; for (i32 i = 0; i < pool->num_worker_threads; ++i) { W32_Thread *worker_thread = pool->worker_threads[i]; W32_WaitReleaseThread(worker_thread); } } } /* Wait on scheduler thread */ if (!Atomic32Fetch(&g->panicking)) { W32_WaitReleaseThread(scheduler_thread); } /* Find any dangling threads that haven't exited gracefully by now */ if (!Atomic32Fetch(&g->panicking)) { LockTicketMutex(&g->threads_tm); if (g->first_thread) { TempArena scratch = BeginScratchNoConflict(); u64 num_dangling_threads = 0; String threads_msg = ZI; threads_msg.text = PushDry(scratch.arena, u8); for (W32_Thread *t = g->first_thread; t; t = t->next) { String name = StringFromCstr(t->thread_name_cstr, countof(t->thread_name_cstr)); threads_msg.len += StringFormat(scratch.arena, Lit(" \"%F\"\n"), FmtString(name)).len; ++num_dangling_threads; } threads_msg = StringFormat(scratch.arena, Lit("%F dangling thread(s):\n%F"), FmtUint(num_dangling_threads), FmtString(threads_msg)); //Panic(threads_msg); EndScratch(scratch); } UnlockTicketMutex(&g->threads_tm); } } #endif //////////////////////////////// //~ Win32 thread DWORD WINAPI W32_Win32ThreadProc(LPVOID vt) { W32_AcquireFiber(0); W32_Thread *t = (W32_Thread *)vt; __profthread(t->thread_name_cstr, t->profiler_group); /* Initialize COM */ CoInitializeEx(0, COINIT_MULTITHREADED); /* Set thread name */ if (t->thread_name_wstr[0] != 0) { SetThreadDescription(GetCurrentThread(), t->thread_name_wstr); } //P_LogInfoF("New thread \"%F\" created with ID %F", FmtString(StringFromCstrNoLimit(t->thread_name_cstr)), FmtUint(ThreadId())); /* Enter thread entry point */ t->entry_point(t->thread_data); /* Uninitialize COM */ CoUninitialize(); return 0; } W32_Thread *W32_AcquireThread(W32_ThreadFunc *entry_point, void *thread_data, String thread_name, i32 profiler_group) { __prof; TempArena scratch = BeginScratchNoConflict(); W32_SharedJobCtx *g = &W32_shared_job_ctx; Assert(entry_point != 0); //P_LogInfoF("Creating thread \"%F\"", FmtString(thread_name)); /* Acquire thread object */ W32_Thread *t = 0; { LockTicketMutex(&g->threads_tm); if (g->first_free_thread) { t = g->first_free_thread; g->first_free_thread = t->next; } else { t = PushStructNoZero(g->threads_arena, W32_Thread); } ZeroStruct(t); if (g->last_thread) { g->last_thread->next = t; t->prev = g->last_thread; } else { g->first_thread = t; } g->last_thread = t; UnlockTicketMutex(&g->threads_tm); } t->entry_point = entry_point; t->thread_data = thread_data; t->profiler_group = profiler_group; /* Copy thread name to params */ { u64 CstrLen = MinU64((countof(t->thread_name_cstr) - 1), thread_name.len); CopyBytes(t->thread_name_cstr, thread_name.text, CstrLen * sizeof(*t->thread_name_cstr)); t->thread_name_cstr[CstrLen] = 0; } { String16 thread_name16 = String16FromString(scratch.arena, thread_name); u64 WstrLen = MinU64((countof(t->thread_name_wstr) - 1), thread_name16.len); CopyBytes(t->thread_name_wstr, thread_name16.text, WstrLen * sizeof(*t->thread_name_wstr)); t->thread_name_wstr[WstrLen] = 0; } t->handle = CreateThread( 0, W32_ThreadStackSize, W32_Win32ThreadProc, t, 0, 0 ); if (!t->handle) { //Panic(Lit("Failed to create thread")); } EndScratch(scratch); return (W32_Thread *)t; } /* Returns 0 if the thread could not release in specified timeout (e.g. because it is still running) */ b32 W32_TryReleaseThread(W32_Thread *thread, f32 timeout_seconds) { __prof; W32_SharedJobCtx *g = &W32_shared_job_ctx; b32 success = 0; W32_Thread *t = (W32_Thread *)thread; HANDLE handle = t->handle; if (handle) { /* Wait for thread to stop */ DWORD timeout_ms = (timeout_seconds > 10000000) ? INFINITE : RoundF32ToI32(timeout_seconds * 1000); DWORD wait_result = WaitForSingleObject(handle, timeout_ms); if (wait_result == WAIT_OBJECT_0) { /* Release thread */ success = 1; CloseHandle(handle); { LockTicketMutex(&g->threads_tm); { W32_Thread *prev = t->prev; W32_Thread *next = t->next; if (prev) { prev->next = next; } else { g->first_thread = next; } if (next) { next->prev = prev; } else { g->last_thread = prev; } t->next = g->first_free_thread; g->first_free_thread = t; } UnlockTicketMutex(&g->threads_tm); } } } return success; } void W32_WaitReleaseThread(W32_Thread *thread) { __prof; b32 success = W32_TryReleaseThread(thread, F32Infinity); Assert(success); LAX success; } //////////////////////////////// //~ Win32 wait list /* REQUIRED: Caller must have acquired `wake_lock` for each fiber in array */ void W32_WakeLockedFibers(i32 num_fibers, W32_Fiber **fibers) { W32_SharedJobCtx *g = &W32_shared_job_ctx; /* Update wait lists */ for (i32 i = 0; i < num_fibers; ++i) { W32_Fiber *fiber = fibers[i]; u64 wait_addr = fiber->wait_addr; u64 wait_time = fiber->wait_time; /* Lock & search wait bins */ /* TODO: Cache these in parameters since caller has one of them already calculated */ W32_WaitBin *wait_addr_bin = 0; W32_WaitBin *wait_time_bin = 0; W32_WaitList *wait_addr_list = 0; W32_WaitList *wait_time_list = 0; if (wait_addr != 0) { wait_addr_bin = &g->wait_addr_bins[wait_addr % W32_NumWaitAddrBins]; LockTicketMutex(&wait_addr_bin->lock); for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { if (tmp->value == (u64)wait_addr) { wait_addr_list = tmp; } } } if (wait_time != 0) { wait_time_bin = &g->wait_time_bins[wait_time % W32_NumWaitTimeBins]; LockTicketMutex(&wait_time_bin->lock); for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { if (tmp->value == (u64)wait_time) { wait_time_list = tmp; } } } { /* Remove from addr list */ if (wait_addr_list) { if (--wait_addr_list->num_waiters == 0) { /* Free addr list */ W32_WaitList *prev = wait_addr_list->prev_in_bin; W32_WaitList *next = wait_addr_list->next_in_bin; if (prev) { prev->next_in_bin = next; } else { wait_addr_bin->first_wait_list = next; } if (next) { next->prev_in_bin = prev; } else { wait_addr_bin->last_wait_list = prev; } wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list; wait_addr_bin->first_free_wait_list = wait_addr_list; } else { i16 prev_id = fiber->prev_addr_waiter; i16 next_id = fiber->next_addr_waiter; if (prev_id) { W32_FiberFromId(prev_id)->next_addr_waiter = next_id; } else { wait_addr_list->first_waiter = next_id; } if (next_id) { W32_FiberFromId(next_id)->prev_addr_waiter = prev_id; } else { wait_addr_list->last_waiter = prev_id; } } fiber->wait_addr = 0; fiber->prev_addr_waiter = 0; fiber->next_addr_waiter = 0; } /* Remove from time list */ if (wait_time_list) { if (--wait_time_list->num_waiters == 0) { /* Free time list */ W32_WaitList *prev = wait_time_list->prev_in_bin; W32_WaitList *next = wait_time_list->next_in_bin; if (prev) { prev->next_in_bin = next; } else { wait_time_bin->first_wait_list = next; } if (next) { next->prev_in_bin = prev; } else { wait_time_bin->last_wait_list = prev; } wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list; wait_time_bin->first_free_wait_list = wait_time_list; } else { i16 prev_id = fiber->prev_time_waiter; i16 next_id = fiber->next_time_waiter; if (prev_id) { W32_FiberFromId(prev_id)->next_time_waiter = next_id; } else { wait_time_list->first_waiter = next_id; } if (next_id) { W32_FiberFromId(next_id)->prev_time_waiter = prev_id; } else { wait_time_list->last_waiter = prev_id; } } fiber->wait_time = 0; fiber->prev_time_waiter = 0; fiber->next_time_waiter = 0; } /* Unlock fiber */ Atomic32FetchSet(&fiber->wake_lock, 0); } /* Unlock wait bins */ if (wait_time_bin != 0) UnlockTicketMutex(&wait_time_bin->lock); if (wait_addr_bin != 0) UnlockTicketMutex(&wait_addr_bin->lock); } /* Resume jobs */ /* TODO: Batch submit waiters based on queue kind rather than one at a time */ i32 job_counts_per_pool[JobPool_Count] = ZI; for (i32 i = 0; i < num_fibers; ++i) { W32_Fiber *fiber = fibers[i]; JobPool pool_kind = fiber->job_pool; ++job_counts_per_pool[pool_kind]; W32_JobPool *pool = &g->job_pools[pool_kind]; W32_JobQueue *queue = &pool->job_queues[fiber->job_priority]; LockTicketMutex(&queue->lock); { W32_JobInfo *info = 0; if (queue->first_free) { info = queue->first_free; queue->first_free = info->next; } else { info = PushStructNoZero(queue->arena, W32_JobInfo); } ZeroStruct(info); info->count = 1; info->num_dispatched = fiber->job_id; info->fiber_id = fiber->id; info->desc = fiber->job_desc; if (queue->first) { info->next = queue->first; } else { queue->last = info; } queue->first = info; } UnlockTicketMutex(&queue->lock); } /* Wake workers */ if (num_fibers > 0) { for (JobPool pool_kind = 0; pool_kind < (i32)countof(job_counts_per_pool); ++pool_kind) { i32 job_count = job_counts_per_pool[pool_kind]; if (job_count > 0) { W32_JobPool *pool = &g->job_pools[pool_kind]; LockTicketMutex(&pool->workers_wake_tm); { Atomic64FetchAdd(&pool->num_jobs_in_queue.v, job_count); if (job_count >= W32_WakeAllThreshold) { WakeByAddressAll(&pool->num_jobs_in_queue); } else { for (i32 i = 0; i < job_count; ++i) { WakeByAddressSingle(&pool->num_jobs_in_queue); } } } UnlockTicketMutex(&pool->workers_wake_tm); } } } } void W32_WakeByAddress(void *addr, i32 count) { TempArena scratch = BeginScratchNoConflict(); W32_SharedJobCtx *g = &W32_shared_job_ctx; u64 wait_addr_bin_index = (u64)addr % W32_NumWaitAddrBins; W32_WaitBin *wait_addr_bin = &g->wait_addr_bins[wait_addr_bin_index]; W32_WaitList *wait_addr_list = 0; /* Get list of waiting fibers */ i32 num_fibers = 0; W32_Fiber **fibers = 0; { LockTicketMutex(&wait_addr_bin->lock); { /* Search for wait addr list */ for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { if (tmp->value == (u64)addr) { wait_addr_list = tmp; } } /* Lock fibers & build array */ if (wait_addr_list) { fibers = PushStructsNoZero(scratch.arena, W32_Fiber *, wait_addr_list->num_waiters); for (W32_Fiber *fiber = W32_FiberFromId(wait_addr_list->first_waiter); fiber && num_fibers < count; fiber = W32_FiberFromId(fiber->next_addr_waiter)) { if (Atomic32FetchTestSet(&fiber->wake_lock, 0, 1) == 0) { fibers[num_fibers] = fiber; ++num_fibers; } } } } UnlockTicketMutex(&wait_addr_bin->lock); } if (num_fibers > 0) { W32_WakeLockedFibers(num_fibers, fibers); } /* Wake win32 blocking thread waiters */ if (count >= W32_WakeAllThreshold) { WakeByAddressAll(addr); } else { for (i32 i = 0; i < count; ++i) { WakeByAddressSingle(addr); } } EndScratch(scratch); } void W32_WakeByTime(u64 time) { TempArena scratch = BeginScratchNoConflict(); W32_SharedJobCtx *g = &W32_shared_job_ctx; u64 wait_time_bin_index = (u64)time % W32_NumWaitTimeBins; W32_WaitBin *wait_time_bin = &g->wait_time_bins[wait_time_bin_index]; W32_WaitList *wait_time_list = 0; /* Build list of waiters to resume */ i32 num_fibers = 0; W32_Fiber **fibers = 0; { LockTicketMutex(&wait_time_bin->lock); { /* Search for wait time list */ for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { if (tmp->value == (u64)time) { wait_time_list = tmp; } } if (wait_time_list) { /* Set waiter wake status & build fibers list */ fibers = PushStructsNoZero(scratch.arena, W32_Fiber *, wait_time_list->num_waiters); for (W32_Fiber *fiber = W32_FiberFromId(wait_time_list->first_waiter); fiber; fiber = W32_FiberFromId(fiber->next_time_waiter)) { if (Atomic32FetchTestSet(&fiber->wake_lock, 0, 1) == 0) { fibers[num_fibers] = fiber; ++num_fibers; } } } } UnlockTicketMutex(&wait_time_bin->lock); } W32_WakeLockedFibers(num_fibers, fibers); EndScratch(scratch); } //////////////////////////////// //~ Win32 fiber //- Acquire fiber /* If `pool` is 0, then the currently running thread will be converted into a fiber */ W32_Fiber *W32_AcquireFiber(W32_JobPool *pool) { W32_SharedJobCtx *g = &W32_shared_job_ctx; i16 fiber_id = 0; W32_Fiber *fiber = 0; char *new_name_cstr = 0; { if (pool != 0) { LockTicketMutex(&pool->free_fibers_tm); if (pool->first_free_fiber_id) { fiber_id = pool->first_free_fiber_id; fiber = &g->fibers[fiber_id]; pool->first_free_fiber_id = fiber->parent_id; } UnlockTicketMutex(&pool->free_fibers_tm); } if (!fiber_id) { LockTicketMutex(&g->fibers_tm); { { fiber_id = g->num_fibers++; if (fiber_id >= MaxFibers) { //Panic(Lit("Max fibers reached")); } fiber = &g->fibers[fiber_id]; new_name_cstr = PushStructs(g->fiber_names_arena, char, W32_FiberNameMaxSize); } } UnlockTicketMutex(&g->fibers_tm); } } if (new_name_cstr != 0) { __profn("Initialize fiber"); fiber->id = fiber_id; /* Id to ASCII */ i32 id_div = fiber_id; char id_chars[64] = ZI; i32 id_chars_len = 0; do { i32 digit = id_div % 10; id_div /= 10; id_chars[id_chars_len] = ("0123456789")[digit]; ++id_chars_len; } while (id_div > 0); i32 rev_start = 0; i32 rev_end = id_chars_len - 1; while (rev_start < rev_end) { char swp = id_chars[rev_start]; id_chars[rev_start] = id_chars[rev_end]; id_chars[rev_end] = swp; ++rev_start; --rev_end; } /* Concat fiber name */ i32 name_size = 1; Assert(sizeof(sizeof(W32_FiberNamePrefixCstr)) <= W32_FiberNameMaxSize); CopyBytes(new_name_cstr, W32_FiberNamePrefixCstr, sizeof(W32_FiberNamePrefixCstr)); name_size += sizeof(W32_FiberNamePrefixCstr) - 2; CopyBytes(new_name_cstr + name_size, id_chars, id_chars_len); name_size += id_chars_len; CopyBytes(new_name_cstr + name_size, W32_FiberNameSuffixCstr, sizeof(W32_FiberNameSuffixCstr)); name_size += sizeof(W32_FiberNameSuffixCstr) - 2; fiber->name_cstr = new_name_cstr; /* Init win32 fiber */ if (pool != 0) { __profn("CreateFiber"); fiber->addr = CreateFiber(W32_FiberStackSize, W32_FiberEntryPoint, (void *)(i64)fiber_id); } else { /* Fiber is not a part of a job pool, convert thread to fiber */ __profn("ConvertThreadToFiber"); fiber->addr = ConvertThreadToFiber((void *)(i64)fiber_id); } } fiber->wait_addr = 0; fiber->wait_time = 0; fiber->prev_addr_waiter = 0; fiber->next_addr_waiter = 0; fiber->prev_time_waiter = 0; fiber->next_time_waiter = 0; fiber->job_id = 0; fiber->job_pool = 0; fiber->job_priority = 0; fiber->yield_param = 0; fiber->parent_id = 0; return fiber; } //- Release fiber void W32_ReleaseFiber(W32_JobPool *pool, W32_Fiber *fiber) { LockTicketMutex(&pool->free_fibers_tm); { i16 fiber_id = fiber->id; fiber->parent_id = pool->first_free_fiber_id; pool->first_free_fiber_id = fiber_id; } UnlockTicketMutex(&pool->free_fibers_tm); } //- Fiber id ForceInline W32_Fiber *W32_FiberFromId(i16 id) { W32_SharedJobCtx *g = &W32_shared_job_ctx; if (id <= 0) { return 0; } else { return &g->fibers[id]; } } //- Fiber control flow ForceNoInline void W32_FiberResume(W32_Fiber *fiber) { MemoryBarrier(); SwitchToFiber(fiber->addr); MemoryBarrier(); } void W32_YieldFiber(W32_Fiber *fiber, W32_Fiber *parent_fiber) { LAX fiber; Assert(fiber->id == FiberId); Assert(parent_fiber->id == fiber->parent_id); Assert(parent_fiber->id > 0); { __prof_fiber_leave(); W32_FiberResume(parent_fiber); __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->job_pool) + Kibi(1) + fiber->id); } } //////////////////////////////// //~ Win32 fiber entry void W32_FiberEntryPoint(void *id_ptr) { W32_SharedJobCtx *g = &W32_shared_job_ctx; i16 id = (i32)(i64)id_ptr; volatile W32_Fiber *fiber = W32_FiberFromId(id); __prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->job_pool) + Kibi(1) + fiber->id); for (;;) { //- Run job { W32_YieldParam *yield_param = fiber->yield_param; yield_param->kind = W32_YieldKind_None; { MemoryBarrier(); GenericJobDesc *job_desc = fiber->job_desc; GenericJobFunc *job_func = job_desc->func; job_func(job_desc->sig, fiber->job_id); MemoryBarrier(); } } //- Job completed { GenericJobDesc *job_desc = fiber->job_desc; /* Check if we're the last completed dispatch in the job */ if ((Atomic32FetchAdd(&job_desc->num_completed, 1) + 1) >= job_desc->count) { /* Decrement counter */ if (job_desc->counter) { AddCounter(job_desc->counter, -1); } /* Release job desc */ ResetArena(job_desc->arena); LockTicketMutex(&g->job_descs_tm); { job_desc->next_free = g->first_free_job_desc; g->first_free_job_desc = job_desc; } UnlockTicketMutex(&g->job_descs_tm); } /* Yield to worker */ fiber->yield_param->kind = W32_YieldKind_Done; W32_Fiber *parent_fiber = W32_FiberFromId(fiber->parent_id); W32_YieldFiber((W32_Fiber *)fiber, parent_fiber); } } } //////////////////////////////// //~ Win32 job worker entry W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg) { W32_SharedJobCtx *g = &W32_shared_job_ctx; W32_WorkerCtx *ctx = worker_ctx_arg; JobPool pool_kind = ctx->pool_kind; W32_JobPool *pool = &g->job_pools[pool_kind]; LAX ctx; { /* TODO: Heuristic pinning */ /* TODO: Pin non-worker threads to other cores */ HANDLE thread_handle = GetCurrentThread(); if (pool->thread_priority) { __profn("Set priority"); b32 success = SetThreadPriority(thread_handle, pool->thread_priority) != 0; Assert(success); LAX success; } #if 0 if (pool->thread_affinity_mask) { __profn("Set affinity"); b32 success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0; #if RtcIsEnabled || ProfilingIsEnabled { /* Retry until external tools can set correct process affinity */ i32 delay_ms = 16; while (!success && delay_ms <= 1024) { __profn("Affinity retry"); Sleep(delay_ms); success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0; delay_ms *= 2; } } #endif Assert(success); LAX success; } #endif if (pool->thread_is_audio) { /* https://learn.microsoft.com/en-us/windows/win32/procthread/multimedia-class-scheduler-service#registry-settings */ __profn("Set mm thread characteristics"); DWORD task = 0; HANDLE mmc_handle = AvSetMmThreadCharacteristics(L"Pro Audio", &task); Assert(mmc_handle); LAX mmc_handle; } } i32 worker_fiber_id = FiberId; W32_Fiber *job_fiber = 0; b32 shutdown = 0; while (!shutdown) { //- Pull job from queue GenericJobDesc *job_desc = 0; JobPriority job_priority = 0; i16 job_fiber_id = 0; i32 job_id = 0; { //__profnc("Pull job", Rgb32F(0.75, 0.75, 0)); for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues) && !job_desc; ++priority) { W32_JobQueue *queue = &pool->job_queues[priority]; if (queue) { LockTicketMutex(&queue->lock); { W32_JobInfo *info = queue->first; while (info && !job_desc) { W32_JobInfo *next = info->next; b32 dequeue = 0; if (info->fiber_id <= 0) { job_id = info->num_dispatched++; if (job_id < info->count) { /* Pick job */ Atomic64FetchAdd(&pool->num_jobs_in_queue.v, -1); job_priority = priority; job_desc = info->desc; if (job_id == (info->count - 1)) { /* We're picking up the last dispatch, so dequeue the job */ dequeue = 1; } } } else { /* This job is to be resumed from a yield */ Atomic64FetchAdd(&pool->num_jobs_in_queue.v, -1); job_fiber_id = info->fiber_id; job_priority = priority; job_id = info->num_dispatched; job_desc = info->desc; dequeue = 1; } if (dequeue) { if (!next) { queue->last = 0; } queue->first = next; info->next = queue->first_free; queue->first_free = info; } info = next; } } UnlockTicketMutex(&queue->lock); } } } //- Release old fiber if resuming a yielded fiber if (job_fiber_id > 0) { if (job_fiber) { W32_ReleaseFiber(pool, job_fiber); } job_fiber = W32_FiberFromId(job_fiber_id); } //- Run fiber if (job_desc) { if (!job_fiber) { job_fiber = W32_AcquireFiber(pool); } job_fiber_id = job_fiber->id; { __profnc("Run fiber", Rgb32F(1, 1, 1)); __profvalue(job_fiber->id); W32_YieldParam yield = ZI; job_fiber->parent_id = worker_fiber_id; job_fiber->job_desc = job_desc; job_fiber->job_id = job_id; job_fiber->job_pool = pool_kind; job_fiber->job_priority = job_priority; job_fiber->yield_param = &yield; b32 done = 0; while (!done) { W32_FiberResume(job_fiber); switch (yield.kind) { default: { /* Invalid yield kind */ TempArena scratch = BeginScratchNoConflict(); //Panic(StringFormat(scratch.arena, Lit("Invalid fiber yield kind \"%F\""), FmtSint(yield.kind))); EndScratch(scratch); } break; //- Fiber is waiting case W32_YieldKind_Wait: { __profn("Process fiber wait"); volatile void *wait_addr = yield.wait.addr; void *wait_cmp = yield.wait.cmp; u32 wait_size = yield.wait.size; i64 wait_timeout_ns = yield.wait.timeout_ns; i64 wait_time = 0; if (wait_timeout_ns > 0 && wait_timeout_ns < I64Max) { u64 current_scheduler_cycle = Atomic64Fetch(&g->current_scheduler_cycle.v); i64 current_scheduler_cycle_period_ns = Atomic64Fetch(&g->current_scheduler_cycle_period_ns.v); wait_time = current_scheduler_cycle + MaxI64((i64)((f64)wait_timeout_ns / (f64)current_scheduler_cycle_period_ns), 1); } u64 wait_addr_bin_index = (u64)wait_addr % W32_NumWaitAddrBins; u64 wait_time_bin_index = (u64)wait_time % W32_NumWaitTimeBins; W32_WaitBin *wait_addr_bin = &g->wait_addr_bins[wait_addr_bin_index]; W32_WaitBin *wait_time_bin = &g->wait_time_bins[wait_time_bin_index]; if (wait_addr != 0) LockTicketMutex(&wait_addr_bin->lock); { if (wait_time != 0) LockTicketMutex(&wait_time_bin->lock); { //- Load and compare value at address now that wait bins are locked b32 cancel_wait = wait_addr == 0 && wait_time == 0; if (wait_addr != 0) { switch (wait_size) { case 1: cancel_wait = (u8)_InterlockedCompareExchange8(wait_addr, 0, 0) != *(u8 *)wait_cmp; break; case 2: cancel_wait = (u16)_InterlockedCompareExchange16(wait_addr, 0, 0) != *(u16 *)wait_cmp; break; case 4: cancel_wait = (u32)_InterlockedCompareExchange(wait_addr, 0, 0) != *(u32 *)wait_cmp; break; case 8: cancel_wait = (u64)_InterlockedCompareExchange64(wait_addr, 0, 0) != *(u64 *)wait_cmp; break; default: cancel_wait = 1; Assert(0); break; /* Invalid wait size */ } } if (wait_time != 0 && !cancel_wait) { cancel_wait = wait_time <= Atomic64Fetch(&g->current_scheduler_cycle.v); } if (!cancel_wait) { if (wait_addr != 0) { //- Search for wait addr list in bin W32_WaitList *wait_addr_list = 0; for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin) { if (tmp->value == (u64)wait_addr) { wait_addr_list = tmp; } } //- Acquire new wait addr list if (!wait_addr_list) { if (wait_addr_bin->first_free_wait_list) { wait_addr_list = wait_addr_bin->first_free_wait_list; wait_addr_bin->first_free_wait_list = wait_addr_list->next_in_bin; } else { LockTicketMutex(&g->wait_lists_arena_tm); { wait_addr_list = PushStructNoZero(g->wait_lists_arena, W32_WaitList); } UnlockTicketMutex(&g->wait_lists_arena_tm); } ZeroStruct(wait_addr_list); wait_addr_list->value = (u64)wait_addr; if (wait_addr_bin->last_wait_list) { wait_addr_bin->last_wait_list->next_in_bin = wait_addr_list; wait_addr_list->prev_in_bin = wait_addr_bin->last_wait_list; } else { wait_addr_bin->first_wait_list = wait_addr_list; } wait_addr_bin->last_wait_list = wait_addr_list; } //- Insert fiber into wait addr list job_fiber->wait_addr = (u64)wait_addr; if (wait_addr_list->last_waiter) { W32_FiberFromId(wait_addr_list->last_waiter)->next_addr_waiter = job_fiber_id; job_fiber->prev_addr_waiter = wait_addr_list->last_waiter; } else { wait_addr_list->first_waiter = job_fiber_id; } wait_addr_list->last_waiter = job_fiber_id; ++wait_addr_list->num_waiters; } if (wait_time != 0) { //- Search for wait time list in bin W32_WaitList *wait_time_list = 0; for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin) { if (tmp->value == (u64)wait_time) { wait_time_list = tmp; } } //- Acquire new wait time list if (!wait_time_list) { if (wait_time_bin->first_free_wait_list) { wait_time_list = wait_time_bin->first_free_wait_list; wait_time_bin->first_free_wait_list = wait_time_list->next_in_bin; } else { LockTicketMutex(&g->wait_lists_arena_tm); { wait_time_list = PushStructNoZero(g->wait_lists_arena, W32_WaitList); } UnlockTicketMutex(&g->wait_lists_arena_tm); } ZeroStruct(wait_time_list); wait_time_list->value = wait_time; if (wait_time_bin->last_wait_list) { wait_time_bin->last_wait_list->next_in_bin = wait_time_list; wait_time_list->prev_in_bin = wait_time_bin->last_wait_list; } else { wait_time_bin->first_wait_list = wait_time_list; } wait_time_bin->last_wait_list = wait_time_list; } //- Insert fiber into wait time list job_fiber->wait_time = wait_time; if (wait_time_list->last_waiter) { W32_FiberFromId(wait_time_list->last_waiter)->next_time_waiter = job_fiber_id; job_fiber->prev_time_waiter = wait_time_list->last_waiter; } else { wait_time_list->first_waiter = job_fiber_id; } wait_time_list->last_waiter = job_fiber_id; ++wait_time_list->num_waiters; } //- PopStruct worker's job fiber job_fiber = 0; done = 1; } } if (wait_time != 0) UnlockTicketMutex(&wait_time_bin->lock); } if (wait_addr != 0) UnlockTicketMutex(&wait_addr_bin->lock); } break; //- Fiber is finished case W32_YieldKind_Done: { done = 1; } break; } } } } //- Wait for job i64 num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v); shutdown = Atomic32Fetch(&pool->workers_shutdown.v); if (num_jobs_in_queue <= 0 && !shutdown) { //__profnc("Wait for job", Rgb32F(0.75, 0.75, 0)); LockTicketMutex(&pool->workers_wake_tm); { num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v); shutdown = Atomic32Fetch(&pool->workers_shutdown.v); while (num_jobs_in_queue <= 0 && !shutdown) { { UnlockTicketMutex(&pool->workers_wake_tm); WaitOnAddress(&pool->num_jobs_in_queue, &num_jobs_in_queue, sizeof(num_jobs_in_queue), INFINITE); LockTicketMutex(&pool->workers_wake_tm); } shutdown = Atomic32Fetch(&pool->workers_shutdown.v); num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v); } } UnlockTicketMutex(&pool->workers_wake_tm); } } //- Worker shutdown if (job_fiber) { W32_ReleaseFiber(pool, job_fiber); } } //////////////////////////////// //~ Win32 job scheduler entry W32_ThreadDef(W32_JobSchedulerEntryFunc, UNUSED arg) { struct W32_SharedJobCtx *g = &W32_shared_job_ctx; { i32 priority = THREAD_PRIORITY_TIME_CRITICAL; b32 success = SetThreadPriority(GetCurrentThread(), priority); LAX success; Assert(success); } /* Create high resolution timer */ HANDLE timer = CreateWaitableTimerExW(0, 0, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); if (!timer) { //Panic(Lit("Failed to create high resolution timer")); } /* Create rolling buffer of scheduler cycles initialized to default value */ i32 periods_index = 0; i64 periods[W32_NumRollingSchedulerPeriods] = ZI; for (i32 i = 0; i < (i32)countof(periods); ++i) { periods[i] = W32_DefaultSchedulerPeriodNs; } i64 last_cycle_ns = 0; while (!Atomic32Fetch(&g->shutdown)) { __profn("Job scheduler cycle"); { __profn("Job scheduler wait"); LARGE_INTEGER due = ZI; due.QuadPart = -1; //due.QuadPart = -10000; //due.QuadPart = -32000; //due.QuadPart = -12000; //due.QuadPart = -8000; SetWaitableTimerEx(timer, &due, 0, 0, 0, 0, 0); WaitForSingleObject(timer, INFINITE); } /* Calculate mean period */ i64 now_ns = TimeNs(); i64 period_ns = last_cycle_ns == 0 ? W32_DefaultSchedulerPeriodNs : now_ns - last_cycle_ns; last_cycle_ns = now_ns; /* Calculate mean period */ { periods[periods_index++] = period_ns; if (periods_index == countof(periods)) { periods_index = 0; } f64 periods_sum_ns = 0; for (i32 i = 0; i < (i32)countof(periods); ++i) { periods_sum_ns += (f64)periods[i]; } f64 mean_ns = periods_sum_ns / (f64)countof(periods); Atomic64FetchSet(&g->current_scheduler_cycle_period_ns.v, RoundF64ToI64(mean_ns)); } { __profn("Job scheduler run"); i64 current_cycle = Atomic64FetchAdd(&g->current_scheduler_cycle.v, 1) + 1; W32_WakeByTime((u64)current_cycle); } } } //////////////////////////////// //~ @hookdef Futex void FutexYield(volatile void *addr, void *cmp, u32 size, i64 timeout_ns) { W32_Fiber *fiber = W32_FiberFromId(FiberId); i16 parent_id = fiber->parent_id; if (parent_id != 0) { *fiber->yield_param = (W32_YieldParam) { .kind = W32_YieldKind_Wait, .wait = { .addr = addr, .cmp = cmp, .size = size, .timeout_ns = timeout_ns } }; W32_YieldFiber(fiber, W32_FiberFromId(parent_id)); } else { i32 timeout_ms = 0; if (timeout_ns > 10000000000000000ll) { timeout_ms = INFINITE; } else if (timeout_ns != 0) { timeout_ms = timeout_ns / 1000000; timeout_ms += (timeout_ms == 0) * SignF32(timeout_ns); } if (addr == 0) { Sleep(timeout_ms); } else { WaitOnAddress(addr, cmp, size, timeout_ms); } } } void FutexWake(void *addr, i32 count) { W32_WakeByAddress(addr, count); } //////////////////////////////// //~ @hoodef Job operations GenericJobDesc *PushJobDesc_(u64 sig_size, u64 sig_align, GenericJobFunc *func, JobDescParams params) { struct W32_SharedJobCtx *g = &W32_shared_job_ctx; GenericJobDesc *result = 0; Arena *arena = 0; LockTicketMutex(&g->job_descs_tm); { if (g->first_free_job_desc) { result = g->first_free_job_desc; g->first_free_job_desc = result->next_free; arena = result->arena; } else { result = PushStructNoZero(g->job_descs_arena, GenericJobDesc); arena = AcquireArena(Mebi(4)); } } UnlockTicketMutex(&g->job_descs_tm); ZeroStruct(result); result = PushStruct(arena, GenericJobDesc); result->arena = arena; result->sig = PushBytes(arena, sig_size, sig_align); result->func = func; result->count = params.count; result->pool = params.pool; result->priority = params.priority; result->counter = params.counter; return result; } void RunJobEx(GenericJobDesc *desc) { __prof; struct W32_SharedJobCtx *g = &W32_shared_job_ctx; i32 count = desc->count; Counter *counter = desc->counter; JobPool pool_kind = desc->pool; JobPriority priority = desc->priority; if (count > 0) { if (counter) { AddCounter(counter, 1); } W32_Fiber *fiber = W32_FiberFromId(FiberId); priority = ClampI32(priority, fiber->job_priority, JobPriority_Count - 1); /* A job cannot create a job with a higher priority than itself */ if (pool_kind == JobPool_Inherit) { pool_kind = fiber->job_pool; } W32_JobPool *pool = &g->job_pools[pool_kind]; W32_JobQueue *queue = &pool->job_queues[priority]; LockTicketMutex(&queue->lock); { W32_JobInfo *info = 0; if (queue->first_free) { info = queue->first_free; queue->first_free = info->next; } else { info = PushStructNoZero(queue->arena, W32_JobInfo); } ZeroStruct(info); info->desc = desc; info->count = count; if (queue->last) { queue->last->next = info; } else { queue->first = info; } queue->last = info; } UnlockTicketMutex(&queue->lock); /* Wake workers */ { LockTicketMutex(&pool->workers_wake_tm); { Atomic64FetchAdd(&pool->num_jobs_in_queue.v, count); if (count >= W32_WakeAllThreshold) { WakeByAddressAll(&pool->num_jobs_in_queue); } else { for (i32 i = 0; i < count; ++i) { WakeByAddressSingle(&pool->num_jobs_in_queue); } } } UnlockTicketMutex(&pool->workers_wake_tm); } } } //////////////////////////////// //~ @hookdef Helpers i64 TimeNs(void) { struct W32_SharedJobCtx *g = &W32_shared_job_ctx; LARGE_INTEGER qpc; QueryPerformanceCounter(&qpc); i64 result = (qpc.QuadPart - g->timer_start_qpc) * g->ns_per_qpc; return result; } u32 GetLogicalProcessorCount(void) { return GetActiveProcessorCount(ALL_PROCESSOR_GROUPS); } i64 GetCurrentSchedulerPeriodNs(void) { W32_SharedJobCtx *g = &W32_shared_job_ctx; return Atomic64Fetch(&g->current_scheduler_cycle_period_ns.v); }