power_play/src/base/base_win32/base_win32_job.c
2025-08-25 14:53:17 -05:00

1517 lines
54 KiB
C

W32_SharedJobCtx W32_shared_job_ctx = ZI;
////////////////////////////////
//~ @hookdef Startup
void StartupBaseJobs(void)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
/* Init timer */
{
LARGE_INTEGER qpf;
QueryPerformanceFrequency(&qpf);
g->ns_per_qpc = 1000000000 / qpf.QuadPart;
}
{
LARGE_INTEGER qpc;
QueryPerformanceCounter(&qpc);
g->timer_start_qpc = qpc.QuadPart;
}
/* Init fibers */
g->num_fibers = 1; /* Fiber at index 0 always nil */
g->fiber_names_arena = AcquireArena(Gibi(64));
/* Init job descs */
g->job_descs_arena = AcquireArena(Gibi(64));
/* Convert main thread to fiber */
W32_AcquireFiber(0);
/* Init wait lists */
g->wait_lists_arena = AcquireArena(Gibi(64));
/* Init job pools */
for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind)
{
W32_JobPool *pool = &g->job_pools[pool_kind];
/* Init queues */
for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues); ++priority)
{
W32_JobQueue *queue = &pool->job_queues[priority];
queue->arena = AcquireArena(Gibi(64));
}
}
/* Init threads pool */
g->threads_arena = AcquireArena(Gibi(64));
/* Start job scheduler */
Atomic64FetchSet(&g->current_scheduler_cycle_period_ns.v, W32_DefaultSchedulerPeriodNs);
W32_Thread *scheduler_thread = W32_AcquireThread(W32_JobSchedulerEntryFunc, 0, Lit("Scheduler thread"), PROF_THREAD_GROUP_SCHEDULER);
LAX scheduler_thread;
//- Start job workers
/* TODO: Heuristic worker counts & affinities */
{
__profn("Start job workers");
for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind)
{
W32_JobPool *pool = &g->job_pools[pool_kind];
String name_fmt = ZI;
i32 prof_group = PROF_THREAD_GROUP_FIBERS - Mebi(pool_kind);
switch (pool_kind)
{
default: Assert(0); break;
case JobPool_Sim:
{
name_fmt = Lit("Sim worker #%F");
pool->num_worker_threads = 4;
pool->thread_affinity_mask = 0x000000000000000Full;
pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL;
} break;
case JobPool_User:
{
name_fmt = Lit("User worker #%F");
pool->num_worker_threads = 4;
pool->thread_affinity_mask = 0x00000000000000F0ull;
pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL;
} break;
case JobPool_Audio:
{
name_fmt = Lit("Audio worker #%F");
pool->num_worker_threads = 2;
pool->thread_affinity_mask = 0x0000000000000300ull;
pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL;
pool->thread_is_audio = 1;
} break;
case JobPool_Background:
{
name_fmt = Lit("Background worker #%F");
pool->num_worker_threads = 2;
pool->thread_affinity_mask = 0x0000000000000C00ull;
} break;
case JobPool_Floating:
{
name_fmt = Lit("Floating worker #%F");
pool->num_worker_threads = 8;
pool->thread_affinity_mask = 0x0000000000000FFFull;
} break;
}
pool->worker_threads_arena = AcquireArena(Gibi(64));
pool->worker_threads = PushStructs(pool->worker_threads_arena, W32_Thread *, pool->num_worker_threads);
pool->worker_contexts = PushStructs(pool->worker_threads_arena, W32_WorkerCtx, pool->num_worker_threads);
for (i32 i = 0; i < pool->num_worker_threads; ++i)
{
W32_WorkerCtx *ctx = &pool->worker_contexts[i];
ctx->pool_kind = pool_kind;
ctx->id = i;
String name = StringFormat(pool->worker_threads_arena, name_fmt, FmtSint(i));
pool->worker_threads[i] = W32_AcquireThread(W32_JobWorkerEntryFunc, ctx, name, prof_group + i);
}
}
}
//OnExit(ShutdownJobs);
}
////////////////////////////////
//~ Shutdown
#if 0
void ShutdownJobs(void)
{
/* Signal shutdown */
if (!Atomic32Fetch(&g->panicking))
{
Atomic32FetchSet(&g->shutdown, 1);
for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind)
{
W32_JobPool *pool = &g->job_pools[pool_kind];
LockTicketMutex(&pool->workers_wake_tm);
{
Atomic32FetchSet(&pool->workers_shutdown.v, 1);
Atomic64FetchSet(&pool->num_jobs_in_queue.v, -100000);
WakeByAddressAll(&pool->num_jobs_in_queue);
}
UnlockTicketMutex(&pool->workers_wake_tm);
}
}
/* Wait on worker threads */
if (!Atomic32Fetch(&g->panicking))
{
for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind)
{
W32_JobPool *pool = &g->job_pools[pool_kind];
for (i32 i = 0; i < pool->num_worker_threads; ++i)
{
W32_Thread *worker_thread = pool->worker_threads[i];
W32_WaitReleaseThread(worker_thread);
}
}
}
/* Wait on scheduler thread */
if (!Atomic32Fetch(&g->panicking))
{
W32_WaitReleaseThread(scheduler_thread);
}
/* Find any dangling threads that haven't exited gracefully by now */
if (!Atomic32Fetch(&g->panicking))
{
LockTicketMutex(&g->threads_tm);
if (g->first_thread)
{
TempArena scratch = BeginScratchNoConflict();
u64 num_dangling_threads = 0;
String threads_msg = ZI;
threads_msg.text = PushDry(scratch.arena, u8);
for (W32_Thread *t = g->first_thread; t; t = t->next)
{
String name = StringFromCstr(t->thread_name_cstr, countof(t->thread_name_cstr));
threads_msg.len += StringFormat(scratch.arena, Lit(" \"%F\"\n"), FmtString(name)).len;
++num_dangling_threads;
}
threads_msg = StringFormat(scratch.arena, Lit("%F dangling thread(s):\n%F"), FmtUint(num_dangling_threads), FmtString(threads_msg));
//Panic(threads_msg);
EndScratch(scratch);
}
UnlockTicketMutex(&g->threads_tm);
}
}
#endif
////////////////////////////////
//~ Win32 thread
DWORD WINAPI W32_Win32ThreadProc(LPVOID vt)
{
W32_AcquireFiber(0);
W32_Thread *t = (W32_Thread *)vt;
__profthread(t->thread_name_cstr, t->profiler_group);
/* Initialize COM */
CoInitializeEx(0, COINIT_MULTITHREADED);
/* Set thread name */
if (t->thread_name_wstr[0] != 0)
{
SetThreadDescription(GetCurrentThread(), t->thread_name_wstr);
}
//P_LogInfoF("New thread \"%F\" created with ID %F", FmtString(StringFromCstrNoLimit(t->thread_name_cstr)), FmtUint(ThreadId()));
/* Enter thread entry point */
t->entry_point(t->thread_data);
/* Uninitialize COM */
CoUninitialize();
return 0;
}
W32_Thread *W32_AcquireThread(W32_ThreadFunc *entry_point, void *thread_data, String thread_name, i32 profiler_group)
{
__prof;
TempArena scratch = BeginScratchNoConflict();
W32_SharedJobCtx *g = &W32_shared_job_ctx;
Assert(entry_point != 0);
//P_LogInfoF("Creating thread \"%F\"", FmtString(thread_name));
/* Acquire thread object */
W32_Thread *t = 0;
{
LockTicketMutex(&g->threads_tm);
if (g->first_free_thread)
{
t = g->first_free_thread;
g->first_free_thread = t->next;
}
else
{
t = PushStructNoZero(g->threads_arena, W32_Thread);
}
ZeroStruct(t);
if (g->last_thread)
{
g->last_thread->next = t;
t->prev = g->last_thread;
}
else
{
g->first_thread = t;
}
g->last_thread = t;
UnlockTicketMutex(&g->threads_tm);
}
t->entry_point = entry_point;
t->thread_data = thread_data;
t->profiler_group = profiler_group;
/* Copy thread name to params */
{
u64 CstrLen = MinU64((countof(t->thread_name_cstr) - 1), thread_name.len);
CopyBytes(t->thread_name_cstr, thread_name.text, CstrLen * sizeof(*t->thread_name_cstr));
t->thread_name_cstr[CstrLen] = 0;
}
{
String16 thread_name16 = String16FromString(scratch.arena, thread_name);
u64 WstrLen = MinU64((countof(t->thread_name_wstr) - 1), thread_name16.len);
CopyBytes(t->thread_name_wstr, thread_name16.text, WstrLen * sizeof(*t->thread_name_wstr));
t->thread_name_wstr[WstrLen] = 0;
}
t->handle = CreateThread(
0,
W32_ThreadStackSize,
W32_Win32ThreadProc,
t,
0,
0
);
if (!t->handle)
{
//Panic(Lit("Failed to create thread"));
}
EndScratch(scratch);
return (W32_Thread *)t;
}
/* Returns 0 if the thread could not release in specified timeout (e.g. because it is still running) */
b32 W32_TryReleaseThread(W32_Thread *thread, f32 timeout_seconds)
{
__prof;
W32_SharedJobCtx *g = &W32_shared_job_ctx;
b32 success = 0;
W32_Thread *t = (W32_Thread *)thread;
HANDLE handle = t->handle;
if (handle)
{
/* Wait for thread to stop */
DWORD timeout_ms = (timeout_seconds > 10000000) ? INFINITE : RoundF32ToI32(timeout_seconds * 1000);
DWORD wait_result = WaitForSingleObject(handle, timeout_ms);
if (wait_result == WAIT_OBJECT_0)
{
/* Release thread */
success = 1;
CloseHandle(handle);
{
LockTicketMutex(&g->threads_tm);
{
W32_Thread *prev = t->prev;
W32_Thread *next = t->next;
if (prev)
{
prev->next = next;
}
else
{
g->first_thread = next;
}
if (next)
{
next->prev = prev;
}
else
{
g->last_thread = prev;
}
t->next = g->first_free_thread;
g->first_free_thread = t;
}
UnlockTicketMutex(&g->threads_tm);
}
}
}
return success;
}
void W32_WaitReleaseThread(W32_Thread *thread)
{
__prof;
b32 success = W32_TryReleaseThread(thread, F32Infinity);
Assert(success);
LAX success;
}
////////////////////////////////
//~ Win32 wait list
/* REQUIRED: Caller must have acquired `wake_lock` for each fiber in array */
void W32_WakeLockedFibers(i32 num_fibers, W32_Fiber **fibers)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
/* Update wait lists */
for (i32 i = 0; i < num_fibers; ++i)
{
W32_Fiber *fiber = fibers[i];
u64 wait_addr = fiber->wait_addr;
u64 wait_time = fiber->wait_time;
/* Lock & search wait bins */
/* TODO: Cache these in parameters since caller has one of them already calculated */
W32_WaitBin *wait_addr_bin = 0;
W32_WaitBin *wait_time_bin = 0;
W32_WaitList *wait_addr_list = 0;
W32_WaitList *wait_time_list = 0;
if (wait_addr != 0)
{
wait_addr_bin = &g->wait_addr_bins[wait_addr % W32_NumWaitAddrBins];
LockTicketMutex(&wait_addr_bin->lock);
for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin)
{
if (tmp->value == (u64)wait_addr)
{
wait_addr_list = tmp;
}
}
}
if (wait_time != 0)
{
wait_time_bin = &g->wait_time_bins[wait_time % W32_NumWaitTimeBins];
LockTicketMutex(&wait_time_bin->lock);
for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin)
{
if (tmp->value == (u64)wait_time)
{
wait_time_list = tmp;
}
}
}
{
/* Remove from addr list */
if (wait_addr_list)
{
if (--wait_addr_list->num_waiters == 0)
{
/* Free addr list */
W32_WaitList *prev = wait_addr_list->prev_in_bin;
W32_WaitList *next = wait_addr_list->next_in_bin;
if (prev)
{
prev->next_in_bin = next;
}
else
{
wait_addr_bin->first_wait_list = next;
}
if (next)
{
next->prev_in_bin = prev;
}
else
{
wait_addr_bin->last_wait_list = prev;
}
wait_addr_list->next_in_bin = wait_addr_bin->first_free_wait_list;
wait_addr_bin->first_free_wait_list = wait_addr_list;
}
else
{
i16 prev_id = fiber->prev_addr_waiter;
i16 next_id = fiber->next_addr_waiter;
if (prev_id)
{
W32_FiberFromId(prev_id)->next_addr_waiter = next_id;
}
else
{
wait_addr_list->first_waiter = next_id;
}
if (next_id)
{
W32_FiberFromId(next_id)->prev_addr_waiter = prev_id;
}
else
{
wait_addr_list->last_waiter = prev_id;
}
}
fiber->wait_addr = 0;
fiber->prev_addr_waiter = 0;
fiber->next_addr_waiter = 0;
}
/* Remove from time list */
if (wait_time_list)
{
if (--wait_time_list->num_waiters == 0)
{
/* Free time list */
W32_WaitList *prev = wait_time_list->prev_in_bin;
W32_WaitList *next = wait_time_list->next_in_bin;
if (prev)
{
prev->next_in_bin = next;
}
else
{
wait_time_bin->first_wait_list = next;
}
if (next)
{
next->prev_in_bin = prev;
}
else
{
wait_time_bin->last_wait_list = prev;
}
wait_time_list->next_in_bin = wait_time_bin->first_free_wait_list;
wait_time_bin->first_free_wait_list = wait_time_list;
}
else
{
i16 prev_id = fiber->prev_time_waiter;
i16 next_id = fiber->next_time_waiter;
if (prev_id)
{
W32_FiberFromId(prev_id)->next_time_waiter = next_id;
}
else
{
wait_time_list->first_waiter = next_id;
}
if (next_id)
{
W32_FiberFromId(next_id)->prev_time_waiter = prev_id;
}
else
{
wait_time_list->last_waiter = prev_id;
}
}
fiber->wait_time = 0;
fiber->prev_time_waiter = 0;
fiber->next_time_waiter = 0;
}
/* Unlock fiber */
Atomic32FetchSet(&fiber->wake_lock, 0);
}
/* Unlock wait bins */
if (wait_time_bin != 0) UnlockTicketMutex(&wait_time_bin->lock);
if (wait_addr_bin != 0) UnlockTicketMutex(&wait_addr_bin->lock);
}
/* Resume jobs */
/* TODO: Batch submit waiters based on queue kind rather than one at a time */
i32 job_counts_per_pool[JobPool_Count] = ZI;
for (i32 i = 0; i < num_fibers; ++i)
{
W32_Fiber *fiber = fibers[i];
JobPool pool_kind = fiber->job_pool;
++job_counts_per_pool[pool_kind];
W32_JobPool *pool = &g->job_pools[pool_kind];
W32_JobQueue *queue = &pool->job_queues[fiber->job_priority];
LockTicketMutex(&queue->lock);
{
W32_JobInfo *info = 0;
if (queue->first_free)
{
info = queue->first_free;
queue->first_free = info->next;
}
else
{
info = PushStructNoZero(queue->arena, W32_JobInfo);
}
ZeroStruct(info);
info->count = 1;
info->num_dispatched = fiber->job_id;
info->fiber_id = fiber->id;
info->desc = fiber->job_desc;
if (queue->first)
{
info->next = queue->first;
}
else
{
queue->last = info;
}
queue->first = info;
}
UnlockTicketMutex(&queue->lock);
}
/* Wake workers */
if (num_fibers > 0)
{
for (JobPool pool_kind = 0; pool_kind < (i32)countof(job_counts_per_pool); ++pool_kind)
{
i32 job_count = job_counts_per_pool[pool_kind];
if (job_count > 0)
{
W32_JobPool *pool = &g->job_pools[pool_kind];
LockTicketMutex(&pool->workers_wake_tm);
{
Atomic64FetchAdd(&pool->num_jobs_in_queue.v, job_count);
if (job_count >= W32_WakeAllThreshold)
{
WakeByAddressAll(&pool->num_jobs_in_queue);
}
else
{
for (i32 i = 0; i < job_count; ++i)
{
WakeByAddressSingle(&pool->num_jobs_in_queue);
}
}
}
UnlockTicketMutex(&pool->workers_wake_tm);
}
}
}
}
void W32_WakeByAddress(void *addr, i32 count)
{
TempArena scratch = BeginScratchNoConflict();
W32_SharedJobCtx *g = &W32_shared_job_ctx;
u64 wait_addr_bin_index = (u64)addr % W32_NumWaitAddrBins;
W32_WaitBin *wait_addr_bin = &g->wait_addr_bins[wait_addr_bin_index];
W32_WaitList *wait_addr_list = 0;
/* Get list of waiting fibers */
i32 num_fibers = 0;
W32_Fiber **fibers = 0;
{
LockTicketMutex(&wait_addr_bin->lock);
{
/* Search for wait addr list */
for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin)
{
if (tmp->value == (u64)addr)
{
wait_addr_list = tmp;
}
}
/* Lock fibers & build array */
if (wait_addr_list)
{
fibers = PushStructsNoZero(scratch.arena, W32_Fiber *, wait_addr_list->num_waiters);
for (W32_Fiber *fiber = W32_FiberFromId(wait_addr_list->first_waiter); fiber && num_fibers < count; fiber = W32_FiberFromId(fiber->next_addr_waiter))
{
if (Atomic32FetchTestSet(&fiber->wake_lock, 0, 1) == 0)
{
fibers[num_fibers] = fiber;
++num_fibers;
}
}
}
}
UnlockTicketMutex(&wait_addr_bin->lock);
}
if (num_fibers > 0)
{
W32_WakeLockedFibers(num_fibers, fibers);
}
/* Wake win32 blocking thread waiters */
if (count >= W32_WakeAllThreshold)
{
WakeByAddressAll(addr);
}
else
{
for (i32 i = 0; i < count; ++i)
{
WakeByAddressSingle(addr);
}
}
EndScratch(scratch);
}
void W32_WakeByTime(u64 time)
{
TempArena scratch = BeginScratchNoConflict();
W32_SharedJobCtx *g = &W32_shared_job_ctx;
u64 wait_time_bin_index = (u64)time % W32_NumWaitTimeBins;
W32_WaitBin *wait_time_bin = &g->wait_time_bins[wait_time_bin_index];
W32_WaitList *wait_time_list = 0;
/* Build list of waiters to resume */
i32 num_fibers = 0;
W32_Fiber **fibers = 0;
{
LockTicketMutex(&wait_time_bin->lock);
{
/* Search for wait time list */
for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin)
{
if (tmp->value == (u64)time)
{
wait_time_list = tmp;
}
}
if (wait_time_list)
{
/* Set waiter wake status & build fibers list */
fibers = PushStructsNoZero(scratch.arena, W32_Fiber *, wait_time_list->num_waiters);
for (W32_Fiber *fiber = W32_FiberFromId(wait_time_list->first_waiter); fiber; fiber = W32_FiberFromId(fiber->next_time_waiter))
{
if (Atomic32FetchTestSet(&fiber->wake_lock, 0, 1) == 0)
{
fibers[num_fibers] = fiber;
++num_fibers;
}
}
}
}
UnlockTicketMutex(&wait_time_bin->lock);
}
W32_WakeLockedFibers(num_fibers, fibers);
EndScratch(scratch);
}
////////////////////////////////
//~ Win32 fiber
//- Acquire fiber
/* If `pool` is 0, then the currently running thread will be converted into a fiber */
W32_Fiber *W32_AcquireFiber(W32_JobPool *pool)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
i16 fiber_id = 0;
W32_Fiber *fiber = 0;
char *new_name_cstr = 0;
{
if (pool != 0)
{
LockTicketMutex(&pool->free_fibers_tm);
if (pool->first_free_fiber_id)
{
fiber_id = pool->first_free_fiber_id;
fiber = &g->fibers[fiber_id];
pool->first_free_fiber_id = fiber->parent_id;
}
UnlockTicketMutex(&pool->free_fibers_tm);
}
if (!fiber_id)
{
LockTicketMutex(&g->fibers_tm);
{
{
fiber_id = g->num_fibers++;
if (fiber_id >= MaxFibers)
{
//Panic(Lit("Max fibers reached"));
}
fiber = &g->fibers[fiber_id];
new_name_cstr = PushStructs(g->fiber_names_arena, char, W32_FiberNameMaxSize);
}
}
UnlockTicketMutex(&g->fibers_tm);
}
}
if (new_name_cstr != 0)
{
__profn("Initialize fiber");
fiber->id = fiber_id;
/* Id to ASCII */
i32 id_div = fiber_id;
char id_chars[64] = ZI;
i32 id_chars_len = 0;
do
{
i32 digit = id_div % 10;
id_div /= 10;
id_chars[id_chars_len] = ("0123456789")[digit];
++id_chars_len;
} while (id_div > 0);
i32 rev_start = 0;
i32 rev_end = id_chars_len - 1;
while (rev_start < rev_end)
{
char swp = id_chars[rev_start];
id_chars[rev_start] = id_chars[rev_end];
id_chars[rev_end] = swp;
++rev_start;
--rev_end;
}
/* Concat fiber name */
i32 name_size = 1;
Assert(sizeof(sizeof(W32_FiberNamePrefixCstr)) <= W32_FiberNameMaxSize);
CopyBytes(new_name_cstr, W32_FiberNamePrefixCstr, sizeof(W32_FiberNamePrefixCstr));
name_size += sizeof(W32_FiberNamePrefixCstr) - 2;
CopyBytes(new_name_cstr + name_size, id_chars, id_chars_len);
name_size += id_chars_len;
CopyBytes(new_name_cstr + name_size, W32_FiberNameSuffixCstr, sizeof(W32_FiberNameSuffixCstr));
name_size += sizeof(W32_FiberNameSuffixCstr) - 2;
fiber->name_cstr = new_name_cstr;
/* Init win32 fiber */
if (pool != 0)
{
__profn("CreateFiber");
fiber->addr = CreateFiber(W32_FiberStackSize, W32_FiberEntryPoint, (void *)(i64)fiber_id);
}
else
{
/* Fiber is not a part of a job pool, convert thread to fiber */
__profn("ConvertThreadToFiber");
fiber->addr = ConvertThreadToFiber((void *)(i64)fiber_id);
}
}
fiber->wait_addr = 0;
fiber->wait_time = 0;
fiber->prev_addr_waiter = 0;
fiber->next_addr_waiter = 0;
fiber->prev_time_waiter = 0;
fiber->next_time_waiter = 0;
fiber->job_id = 0;
fiber->job_pool = 0;
fiber->job_priority = 0;
fiber->yield_param = 0;
fiber->parent_id = 0;
return fiber;
}
//- Release fiber
void W32_ReleaseFiber(W32_JobPool *pool, W32_Fiber *fiber)
{
LockTicketMutex(&pool->free_fibers_tm);
{
i16 fiber_id = fiber->id;
fiber->parent_id = pool->first_free_fiber_id;
pool->first_free_fiber_id = fiber_id;
}
UnlockTicketMutex(&pool->free_fibers_tm);
}
//- Fiber id
ForceInline W32_Fiber *W32_FiberFromId(i16 id)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
if (id <= 0)
{
return 0;
}
else
{
return &g->fibers[id];
}
}
//- Fiber control flow
ForceNoInline void W32_FiberResume(W32_Fiber *fiber)
{
MemoryBarrier();
SwitchToFiber(fiber->addr);
MemoryBarrier();
}
void W32_YieldFiber(W32_Fiber *fiber, W32_Fiber *parent_fiber)
{
LAX fiber;
Assert(fiber->id == FiberId());
Assert(parent_fiber->id == fiber->parent_id);
Assert(parent_fiber->id > 0);
{
__prof_fiber_leave();
W32_FiberResume(parent_fiber);
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->job_pool) + Kibi(1) + fiber->id);
}
}
////////////////////////////////
//~ Win32 fiber entry
void W32_FiberEntryPoint(void *id_ptr)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
i16 id = (i32)(i64)id_ptr;
volatile W32_Fiber *fiber = W32_FiberFromId(id);
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->job_pool) + Kibi(1) + fiber->id);
for (;;)
{
//- Run job
{
W32_YieldParam *yield_param = fiber->yield_param;
yield_param->kind = W32_YieldKind_None;
{
MemoryBarrier();
GenericJobDesc *job_desc = fiber->job_desc;
GenericJobFunc *job_func = job_desc->func;
job_func(job_desc->sig, fiber->job_id);
MemoryBarrier();
}
}
//- Job completed
{
GenericJobDesc *job_desc = fiber->job_desc;
/* Check if we're the last completed dispatch in the job */
if ((Atomic32FetchAdd(&job_desc->num_completed, 1) + 1) >= job_desc->count)
{
/* Decrement counter */
if (job_desc->counter)
{
AddCounter(job_desc->counter, -1);
}
/* Release job desc */
ResetArena(job_desc->arena);
LockTicketMutex(&g->job_descs_tm);
{
job_desc->next_free = g->first_free_job_desc;
g->first_free_job_desc = job_desc;
}
UnlockTicketMutex(&g->job_descs_tm);
}
/* Yield to worker */
fiber->yield_param->kind = W32_YieldKind_Done;
W32_Fiber *parent_fiber = W32_FiberFromId(fiber->parent_id);
W32_YieldFiber((W32_Fiber *)fiber, parent_fiber);
}
}
}
////////////////////////////////
//~ Win32 job worker entry
W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
W32_WorkerCtx *ctx = worker_ctx_arg;
JobPool pool_kind = ctx->pool_kind;
W32_JobPool *pool = &g->job_pools[pool_kind];
LAX ctx;
{
/* TODO: Heuristic pinning */
/* TODO: Pin non-worker threads to other cores */
HANDLE thread_handle = GetCurrentThread();
if (pool->thread_priority)
{
__profn("Set priority");
b32 success = SetThreadPriority(thread_handle, pool->thread_priority) != 0;
Assert(success);
LAX success;
}
#if 0
if (pool->thread_affinity_mask)
{
__profn("Set affinity");
b32 success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0;
#if RtcIsEnabled || ProfilingIsEnabled
{
/* Retry until external tools can set correct process affinity */
i32 delay_ms = 16;
while (!success && delay_ms <= 1024)
{
__profn("Affinity retry");
Sleep(delay_ms);
success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0;
delay_ms *= 2;
}
}
#endif
Assert(success);
LAX success;
}
#endif
if (pool->thread_is_audio)
{
/* https://learn.microsoft.com/en-us/windows/win32/procthread/multimedia-class-scheduler-service#registry-settings */
__profn("Set mm thread characteristics");
DWORD task = 0;
HANDLE mmc_handle = AvSetMmThreadCharacteristics(L"Pro Audio", &task);
Assert(mmc_handle);
LAX mmc_handle;
}
}
i32 worker_fiber_id = FiberId();
W32_Fiber *job_fiber = 0;
b32 shutdown = 0;
while (!shutdown)
{
//- Pull job from queue
GenericJobDesc *job_desc = 0;
JobPriority job_priority = 0;
i16 job_fiber_id = 0;
i32 job_id = 0;
{
//__profnc("Pull job", Rgb32F(0.75, 0.75, 0));
for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues) && !job_desc; ++priority)
{
W32_JobQueue *queue = &pool->job_queues[priority];
if (queue)
{
LockTicketMutex(&queue->lock);
{
W32_JobInfo *info = queue->first;
while (info && !job_desc)
{
W32_JobInfo *next = info->next;
b32 dequeue = 0;
if (info->fiber_id <= 0)
{
job_id = info->num_dispatched++;
if (job_id < info->count)
{
/* Pick job */
Atomic64FetchAdd(&pool->num_jobs_in_queue.v, -1);
job_priority = priority;
job_desc = info->desc;
if (job_id == (info->count - 1))
{
/* We're picking up the last dispatch, so dequeue the job */
dequeue = 1;
}
}
}
else
{
/* This job is to be resumed from a yield */
Atomic64FetchAdd(&pool->num_jobs_in_queue.v, -1);
job_fiber_id = info->fiber_id;
job_priority = priority;
job_id = info->num_dispatched;
job_desc = info->desc;
dequeue = 1;
}
if (dequeue)
{
if (!next)
{
queue->last = 0;
}
queue->first = next;
info->next = queue->first_free;
queue->first_free = info;
}
info = next;
}
}
UnlockTicketMutex(&queue->lock);
}
}
}
//- Release old fiber if resuming a yielded fiber
if (job_fiber_id > 0)
{
if (job_fiber)
{
W32_ReleaseFiber(pool, job_fiber);
}
job_fiber = W32_FiberFromId(job_fiber_id);
}
//- Run fiber
if (job_desc)
{
if (!job_fiber)
{
job_fiber = W32_AcquireFiber(pool);
}
job_fiber_id = job_fiber->id;
{
__profnc("Run fiber", Rgb32F(1, 1, 1));
__profvalue(job_fiber->id);
W32_YieldParam yield = ZI;
job_fiber->parent_id = worker_fiber_id;
job_fiber->job_desc = job_desc;
job_fiber->job_id = job_id;
job_fiber->job_pool = pool_kind;
job_fiber->job_priority = job_priority;
job_fiber->yield_param = &yield;
b32 done = 0;
while (!done)
{
W32_FiberResume(job_fiber);
switch (yield.kind)
{
default:
{
/* Invalid yield kind */
TempArena scratch = BeginScratchNoConflict();
//Panic(StringFormat(scratch.arena, Lit("Invalid fiber yield kind \"%F\""), FmtSint(yield.kind)));
EndScratch(scratch);
} break;
//- Fiber is waiting
case W32_YieldKind_Wait:
{
__profn("Process fiber wait");
volatile void *wait_addr = yield.wait.addr;
void *wait_cmp = yield.wait.cmp;
u32 wait_size = yield.wait.size;
i64 wait_timeout_ns = yield.wait.timeout_ns;
i64 wait_time = 0;
if (wait_timeout_ns > 0 && wait_timeout_ns < I64Max)
{
u64 current_scheduler_cycle = Atomic64Fetch(&g->current_scheduler_cycle.v);
i64 current_scheduler_cycle_period_ns = Atomic64Fetch(&g->current_scheduler_cycle_period_ns.v);
wait_time = current_scheduler_cycle + MaxI64((i64)((f64)wait_timeout_ns / (f64)current_scheduler_cycle_period_ns), 1);
}
u64 wait_addr_bin_index = (u64)wait_addr % W32_NumWaitAddrBins;
u64 wait_time_bin_index = (u64)wait_time % W32_NumWaitTimeBins;
W32_WaitBin *wait_addr_bin = &g->wait_addr_bins[wait_addr_bin_index];
W32_WaitBin *wait_time_bin = &g->wait_time_bins[wait_time_bin_index];
if (wait_addr != 0) LockTicketMutex(&wait_addr_bin->lock);
{
if (wait_time != 0) LockTicketMutex(&wait_time_bin->lock);
{
//- Load and compare value at address now that wait bins are locked
b32 cancel_wait = wait_addr == 0 && wait_time == 0;
if (wait_addr != 0)
{
switch (wait_size)
{
case 1: cancel_wait = (u8)_InterlockedCompareExchange8(wait_addr, 0, 0) != *(u8 *)wait_cmp; break;
case 2: cancel_wait = (u16)_InterlockedCompareExchange16(wait_addr, 0, 0) != *(u16 *)wait_cmp; break;
case 4: cancel_wait = (u32)_InterlockedCompareExchange(wait_addr, 0, 0) != *(u32 *)wait_cmp; break;
case 8: cancel_wait = (u64)_InterlockedCompareExchange64(wait_addr, 0, 0) != *(u64 *)wait_cmp; break;
default: cancel_wait = 1; Assert(0); break; /* Invalid wait size */
}
}
if (wait_time != 0 && !cancel_wait)
{
cancel_wait = wait_time <= Atomic64Fetch(&g->current_scheduler_cycle.v);
}
if (!cancel_wait)
{
if (wait_addr != 0)
{
//- Search for wait addr list in bin
W32_WaitList *wait_addr_list = 0;
for (W32_WaitList *tmp = wait_addr_bin->first_wait_list; tmp && !wait_addr_list; tmp = tmp->next_in_bin)
{
if (tmp->value == (u64)wait_addr)
{
wait_addr_list = tmp;
}
}
//- Acquire new wait addr list
if (!wait_addr_list)
{
if (wait_addr_bin->first_free_wait_list)
{
wait_addr_list = wait_addr_bin->first_free_wait_list;
wait_addr_bin->first_free_wait_list = wait_addr_list->next_in_bin;
}
else
{
LockTicketMutex(&g->wait_lists_arena_tm);
{
wait_addr_list = PushStructNoZero(g->wait_lists_arena, W32_WaitList);
}
UnlockTicketMutex(&g->wait_lists_arena_tm);
}
ZeroStruct(wait_addr_list);
wait_addr_list->value = (u64)wait_addr;
if (wait_addr_bin->last_wait_list)
{
wait_addr_bin->last_wait_list->next_in_bin = wait_addr_list;
wait_addr_list->prev_in_bin = wait_addr_bin->last_wait_list;
}
else
{
wait_addr_bin->first_wait_list = wait_addr_list;
}
wait_addr_bin->last_wait_list = wait_addr_list;
}
//- Insert fiber into wait addr list
job_fiber->wait_addr = (u64)wait_addr;
if (wait_addr_list->last_waiter)
{
W32_FiberFromId(wait_addr_list->last_waiter)->next_addr_waiter = job_fiber_id;
job_fiber->prev_addr_waiter = wait_addr_list->last_waiter;
}
else
{
wait_addr_list->first_waiter = job_fiber_id;
}
wait_addr_list->last_waiter = job_fiber_id;
++wait_addr_list->num_waiters;
}
if (wait_time != 0)
{
//- Search for wait time list in bin
W32_WaitList *wait_time_list = 0;
for (W32_WaitList *tmp = wait_time_bin->first_wait_list; tmp && !wait_time_list; tmp = tmp->next_in_bin)
{
if (tmp->value == (u64)wait_time)
{
wait_time_list = tmp;
}
}
//- Acquire new wait time list
if (!wait_time_list)
{
if (wait_time_bin->first_free_wait_list)
{
wait_time_list = wait_time_bin->first_free_wait_list;
wait_time_bin->first_free_wait_list = wait_time_list->next_in_bin;
}
else
{
LockTicketMutex(&g->wait_lists_arena_tm);
{
wait_time_list = PushStructNoZero(g->wait_lists_arena, W32_WaitList);
}
UnlockTicketMutex(&g->wait_lists_arena_tm);
}
ZeroStruct(wait_time_list);
wait_time_list->value = wait_time;
if (wait_time_bin->last_wait_list)
{
wait_time_bin->last_wait_list->next_in_bin = wait_time_list;
wait_time_list->prev_in_bin = wait_time_bin->last_wait_list;
}
else
{
wait_time_bin->first_wait_list = wait_time_list;
}
wait_time_bin->last_wait_list = wait_time_list;
}
//- Insert fiber into wait time list
job_fiber->wait_time = wait_time;
if (wait_time_list->last_waiter)
{
W32_FiberFromId(wait_time_list->last_waiter)->next_time_waiter = job_fiber_id;
job_fiber->prev_time_waiter = wait_time_list->last_waiter;
}
else
{
wait_time_list->first_waiter = job_fiber_id;
}
wait_time_list->last_waiter = job_fiber_id;
++wait_time_list->num_waiters;
}
//- PopStruct worker's job fiber
job_fiber = 0;
done = 1;
}
}
if (wait_time != 0) UnlockTicketMutex(&wait_time_bin->lock);
}
if (wait_addr != 0) UnlockTicketMutex(&wait_addr_bin->lock);
} break;
//- Fiber is finished
case W32_YieldKind_Done:
{
done = 1;
} break;
}
}
}
}
//- Wait for job
i64 num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v);
shutdown = Atomic32Fetch(&pool->workers_shutdown.v);
if (num_jobs_in_queue <= 0 && !shutdown)
{
//__profnc("Wait for job", Rgb32F(0.75, 0.75, 0));
LockTicketMutex(&pool->workers_wake_tm);
{
num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v);
shutdown = Atomic32Fetch(&pool->workers_shutdown.v);
while (num_jobs_in_queue <= 0 && !shutdown)
{
{
UnlockTicketMutex(&pool->workers_wake_tm);
WaitOnAddress(&pool->num_jobs_in_queue, &num_jobs_in_queue, sizeof(num_jobs_in_queue), INFINITE);
LockTicketMutex(&pool->workers_wake_tm);
}
shutdown = Atomic32Fetch(&pool->workers_shutdown.v);
num_jobs_in_queue = Atomic64Fetch(&pool->num_jobs_in_queue.v);
}
}
UnlockTicketMutex(&pool->workers_wake_tm);
}
}
//- Worker shutdown
if (job_fiber)
{
W32_ReleaseFiber(pool, job_fiber);
}
}
////////////////////////////////
//~ Win32 job scheduler entry
W32_ThreadDef(W32_JobSchedulerEntryFunc, UNUSED arg)
{
struct W32_SharedJobCtx *g = &W32_shared_job_ctx;
{
i32 priority = THREAD_PRIORITY_TIME_CRITICAL;
b32 success = SetThreadPriority(GetCurrentThread(), priority);
LAX success;
Assert(success);
}
/* Create high resolution timer */
HANDLE timer = CreateWaitableTimerExW(0, 0, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
if (!timer)
{
//Panic(Lit("Failed to create high resolution timer"));
}
/* Create rolling buffer of scheduler cycles initialized to default value */
i32 periods_index = 0;
i64 periods[W32_NumRollingSchedulerPeriods] = ZI;
for (i32 i = 0; i < (i32)countof(periods); ++i)
{
periods[i] = W32_DefaultSchedulerPeriodNs;
}
i64 last_cycle_ns = 0;
while (!Atomic32Fetch(&g->shutdown))
{
__profn("Job scheduler cycle");
{
__profn("Job scheduler wait");
LARGE_INTEGER due = ZI;
due.QuadPart = -1;
//due.QuadPart = -10000;
//due.QuadPart = -32000;
//due.QuadPart = -12000;
//due.QuadPart = -8000;
SetWaitableTimerEx(timer, &due, 0, 0, 0, 0, 0);
WaitForSingleObject(timer, INFINITE);
}
/* Calculate mean period */
i64 now_ns = TimeNs();
i64 period_ns = last_cycle_ns == 0 ? W32_DefaultSchedulerPeriodNs : now_ns - last_cycle_ns;
last_cycle_ns = now_ns;
/* Calculate mean period */
{
periods[periods_index++] = period_ns;
if (periods_index == countof(periods))
{
periods_index = 0;
}
f64 periods_sum_ns = 0;
for (i32 i = 0; i < (i32)countof(periods); ++i)
{
periods_sum_ns += (f64)periods[i];
}
f64 mean_ns = periods_sum_ns / (f64)countof(periods);
Atomic64FetchSet(&g->current_scheduler_cycle_period_ns.v, RoundF64ToI64(mean_ns));
}
{
__profn("Job scheduler run");
i64 current_cycle = Atomic64FetchAdd(&g->current_scheduler_cycle.v, 1) + 1;
W32_WakeByTime((u64)current_cycle);
}
}
}
////////////////////////////////
//~ @hookdef Futex
void FutexYield(volatile void *addr, void *cmp, u32 size, i64 timeout_ns)
{
W32_Fiber *fiber = W32_FiberFromId(FiberId());
i16 parent_id = fiber->parent_id;
if (parent_id != 0)
{
*fiber->yield_param = (W32_YieldParam) {
.kind = W32_YieldKind_Wait,
.wait = {
.addr = addr,
.cmp = cmp,
.size = size,
.timeout_ns = timeout_ns
}
};
W32_YieldFiber(fiber, W32_FiberFromId(parent_id));
}
else
{
i32 timeout_ms = 0;
if (timeout_ns > 10000000000000000ll)
{
timeout_ms = INFINITE;
}
else if (timeout_ns != 0)
{
timeout_ms = timeout_ns / 1000000;
timeout_ms += (timeout_ms == 0) * SignF32(timeout_ns);
}
if (addr == 0)
{
Sleep(timeout_ms);
}
else
{
WaitOnAddress(addr, cmp, size, timeout_ms);
}
}
}
void FutexWake(void *addr, i32 count)
{
W32_WakeByAddress(addr, count);
}
////////////////////////////////
//~ @hoodef Job operations
GenericJobDesc *PushJobDesc_(u64 sig_size, u64 sig_align, GenericJobFunc *func, JobDescParams params)
{
struct W32_SharedJobCtx *g = &W32_shared_job_ctx;
GenericJobDesc *result = 0;
Arena *arena = 0;
LockTicketMutex(&g->job_descs_tm);
{
if (g->first_free_job_desc)
{
result = g->first_free_job_desc;
g->first_free_job_desc = result->next_free;
arena = result->arena;
}
else
{
result = PushStructNoZero(g->job_descs_arena, GenericJobDesc);
arena = AcquireArena(Mebi(4));
}
}
UnlockTicketMutex(&g->job_descs_tm);
ZeroStruct(result);
result = PushStruct(arena, GenericJobDesc);
result->arena = arena;
result->sig = PushBytes(arena, sig_size, sig_align);
result->func = func;
result->count = params.count;
result->pool = params.pool;
result->priority = params.priority;
result->counter = params.counter;
return result;
}
void RunJobEx(GenericJobDesc *desc)
{
__prof;
struct W32_SharedJobCtx *g = &W32_shared_job_ctx;
i32 count = desc->count;
Counter *counter = desc->counter;
JobPool pool_kind = desc->pool;
JobPriority priority = desc->priority;
if (count > 0)
{
if (counter)
{
AddCounter(counter, 1);
}
W32_Fiber *fiber = W32_FiberFromId(FiberId());
priority = ClampI32(priority, fiber->job_priority, JobPriority_Count - 1); /* A job cannot create a job with a higher priority than itself */
if (pool_kind == JobPool_Inherit)
{
pool_kind = fiber->job_pool;
}
W32_JobPool *pool = &g->job_pools[pool_kind];
W32_JobQueue *queue = &pool->job_queues[priority];
LockTicketMutex(&queue->lock);
{
W32_JobInfo *info = 0;
if (queue->first_free)
{
info = queue->first_free;
queue->first_free = info->next;
}
else
{
info = PushStructNoZero(queue->arena, W32_JobInfo);
}
ZeroStruct(info);
info->desc = desc;
info->count = count;
if (queue->last)
{
queue->last->next = info;
}
else
{
queue->first = info;
}
queue->last = info;
}
UnlockTicketMutex(&queue->lock);
/* Wake workers */
{
LockTicketMutex(&pool->workers_wake_tm);
{
Atomic64FetchAdd(&pool->num_jobs_in_queue.v, count);
if (count >= W32_WakeAllThreshold)
{
WakeByAddressAll(&pool->num_jobs_in_queue);
}
else
{
for (i32 i = 0; i < count; ++i)
{
WakeByAddressSingle(&pool->num_jobs_in_queue);
}
}
}
UnlockTicketMutex(&pool->workers_wake_tm);
}
}
}
////////////////////////////////
//~ @hookdef Helpers
i64 TimeNs(void)
{
struct W32_SharedJobCtx *g = &W32_shared_job_ctx;
LARGE_INTEGER qpc;
QueryPerformanceCounter(&qpc);
i64 result = (qpc.QuadPart - g->timer_start_qpc) * g->ns_per_qpc;
return result;
}
u32 GetLogicalProcessorCount(void)
{
return GetActiveProcessorCount(ALL_PROCESSOR_GROUPS);
}
i64 GetCurrentSchedulerPeriodNs(void)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
return Atomic64Fetch(&g->current_scheduler_cycle_period_ns.v);
}