power_play/src/base/base_win32/base_win32_job.c
2025-09-12 18:29:48 -05:00

788 lines
24 KiB
C

W32_SharedJobState W32_shared_job_state = ZI;
////////////////////////////////
//~ @hookdef Startup
void InitJobSystem(void)
{
/* Init fibers */
W32_SharedJobState *g = &W32_shared_job_state;
g->num_fibers = 1; /* Fiber at index 0 always nil */
W32_AcquireFiber(0); /* Convert main thread to fiber */
Arena *perm = PermArena();
/* Init job pools */
for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind)
{
W32_JobPool *pool = &g->job_pools[pool_kind];
/* FIXME */
}
//- Start job workers
/* TODO: Heuristic worker counts & affinities */
{
__profn("Start job workers");
for (JobPool pool_kind = 0; pool_kind < (i32)countof(g->job_pools); ++pool_kind)
{
W32_JobPool *pool = &g->job_pools[pool_kind];
String name_fmt = ZI;
i32 prof_group = PROF_THREAD_GROUP_FIBERS - Mebi(pool_kind);
switch (pool_kind)
{
case JobPool_Sim:
{
name_fmt = Lit("Sim worker #%F");
pool->num_worker_threads = 4;
pool->thread_affinity_mask = 0x000000000000000Full;
pool->thread_priority = THREAD_PRIORITY_ABOVE_NORMAL;
} break;
case JobPool_User:
{
name_fmt = Lit("User worker #%F");
pool->num_worker_threads = 4;
pool->thread_affinity_mask = 0x00000000000000F0ull;
pool->thread_priority = THREAD_PRIORITY_ABOVE_NORMAL;
} break;
case JobPool_Audio:
{
name_fmt = Lit("Audio worker #%F");
pool->num_worker_threads = 2;
pool->thread_affinity_mask = 0x0000000000000300ull;
pool->thread_priority = THREAD_PRIORITY_TIME_CRITICAL;
pool->thread_is_audio = 1;
} break;
case JobPool_Background:
{
name_fmt = Lit("Background worker #%F");
pool->num_worker_threads = 2;
pool->thread_affinity_mask = 0x0000000000000C00ull;
pool->thread_priority = THREAD_PRIORITY_NORMAL;
} break;
case JobPool_Blocking:
{
name_fmt = Lit("Blocking worker #%F");
pool->num_worker_threads = 8;
pool->thread_priority = THREAD_PRIORITY_NORMAL;
} break;
case JobPool_Hyper:
{
name_fmt = Lit("Hyper worker #%F");
pool->num_worker_threads = 8;
pool->thread_priority = THREAD_PRIORITY_HIGHEST;
} break;
}
pool->worker_threads = PushStructs(perm, W32_Thread *, pool->num_worker_threads);
pool->worker_contexts = PushStructs(perm, W32_WorkerCtx, pool->num_worker_threads);
for (i32 i = 0; i < pool->num_worker_threads; ++i)
{
W32_WorkerCtx *ctx = &pool->worker_contexts[i];
ctx->pool_kind = pool_kind;
ctx->id = i;
String name = FormatString(perm, name_fmt, FmtSint(i));
pool->worker_threads[i] = W32_StartThread(W32_JobWorkerEntryPoint, ctx, name, prof_group + i);
}
}
}
//OnExit(ShutdownJobs);
}
////////////////////////////////
//~ Win32 thread
DWORD WINAPI W32_Win32ThreadProc(LPVOID vt)
{
/* Convert thread to fiber */
W32_AcquireFiber(0);
Arena *perm = PermArena();
W32_Thread *t = (W32_Thread *)vt;
String thread_name_desc = StringF(perm, "[%F] %F", FmtSint(FiberId()), FmtString(t->thread_name));
char *thread_name_desc_cstr = CstrFromString(perm, thread_name_desc);
wchar_t *thread_name_desc_wstr = WstrFromString(perm, thread_name_desc);
__profthread(thread_name_dsec_cstr, t->profiler_group);
/* Initialize COM */
CoInitializeEx(0, COINIT_MULTITHREADED);
/* Set thread name */
SetThreadDescription(GetCurrentThread(), thread_name_desc_wstr);
//P_LogInfoF("New thread \"%F\" created with ID %F", FmtString(StringFromCstrNoLimit(t->thread_name_cstr)), FmtUint(ThreadId()));
/* Enter thread entry point */
t->entry_point(t->thread_udata);
/* Uninitialize COM */
CoUninitialize();
return 0;
}
W32_Thread *W32_StartThread(W32_ThreadFunc *entry_point, void *thread_udata, String thread_name, i32 profiler_group)
{
__prof;
W32_SharedJobState *g = &W32_shared_job_state;
TempArena scratch = BeginScratchNoConflict();
Arena *perm = PermArena();
Assert(entry_point != 0);
//P_LogInfoF("Creating thread \"%F\"", FmtString(thread_name));
W32_Thread *t = PushStruct(perm, W32_Thread);
t->thread_name = PushString(perm, thread_name);
t->entry_point = entry_point;
t->thread_udata = thread_udata;
t->profiler_group = profiler_group;
t->handle = CreateThread(
0,
W32_FiberStackSize,
W32_Win32ThreadProc,
t,
0,
0
);
if (!t->handle)
{
Panic(Lit("Failed to create thread"));
}
EndScratch(scratch);
return (W32_Thread *)t;
}
/* Returns 0 if the thread could not release in specified timeout (e.g. because it is still running) */
b32 W32_TryEndThread(W32_Thread *thread, f32 timeout_seconds)
{
__prof;
W32_SharedJobState *g = &W32_shared_job_state;
b32 success = 0;
W32_Thread *t = (W32_Thread *)thread;
HANDLE handle = t->handle;
if (handle)
{
/* Wait for thread to stop */
DWORD timeout_ms = (timeout_seconds > 10000000) ? INFINITE : RoundF32ToI32(timeout_seconds * 1000);
DWORD wait_result = WaitForSingleObject(handle, timeout_ms);
if (wait_result == WAIT_OBJECT_0)
{
/* Release thread */
success = 1;
CloseHandle(handle);
}
}
return success;
}
void W32_WaitEndThread(W32_Thread *thread)
{
__prof;
b32 success = W32_TryEndThread(thread, F32Infinity);
Assert(success);
}
////////////////////////////////
//~ Win32 fiber
//- Acquire fiber
/* If `pool` is 0, then the currently running thread will be converted into a fiber */
W32_Fiber *W32_AcquireFiber(W32_JobPool *pool)
{
W32_SharedJobState *g = &W32_shared_job_state;
i16 fiber_id = 0;
W32_Fiber *fiber = 0;
char *new_name_cstr = 0;
{
if (pool != 0)
{
LockTicketMutex(&pool->free_fibers_tm);
if (pool->first_free_fiber_id)
{
fiber_id = pool->first_free_fiber_id;
fiber = &g->fibers[fiber_id];
pool->first_free_fiber_id = fiber->return_id;
}
UnlockTicketMutex(&pool->free_fibers_tm);
}
if (!fiber_id)
{
LockTicketMutex(&g->fibers_tm);
{
{
fiber_id = g->num_fibers++;
if (fiber_id >= MaxFibers)
{
Panic(Lit("Max fibers reached"));
}
fiber = &g->fibers[fiber_id];
if (!g->fiber_names_arena)
{
g->fiber_names_arena = AcquireArena(Gibi(64));
}
new_name_cstr = PushStructs(g->fiber_names_arena, char, W32_FiberNameMaxSize);
}
}
UnlockTicketMutex(&g->fibers_tm);
}
}
if (new_name_cstr != 0)
{
__profn("Initialize fiber");
fiber->id = fiber_id;
/* Id to ASCII */
i32 id_div = fiber_id;
char id_chars[64] = ZI;
i32 id_chars_len = 0;
do
{
i32 digit = id_div % 10;
id_div /= 10;
id_chars[id_chars_len] = ("0123456789")[digit];
++id_chars_len;
} while (id_div > 0);
i32 rev_start = 0;
i32 rev_end = id_chars_len - 1;
while (rev_start < rev_end)
{
char swp = id_chars[rev_start];
id_chars[rev_start] = id_chars[rev_end];
id_chars[rev_end] = swp;
++rev_start;
--rev_end;
}
/* Concat fiber name */
i32 name_size = 1;
Assert(sizeof(sizeof(W32_FiberNamePrefixCstr)) <= W32_FiberNameMaxSize);
CopyBytes(new_name_cstr, W32_FiberNamePrefixCstr, sizeof(W32_FiberNamePrefixCstr));
name_size += sizeof(W32_FiberNamePrefixCstr) - 2;
CopyBytes(new_name_cstr + name_size, id_chars, id_chars_len);
name_size += id_chars_len;
CopyBytes(new_name_cstr + name_size, W32_FiberNameSuffixCstr, sizeof(W32_FiberNameSuffixCstr));
name_size += sizeof(W32_FiberNameSuffixCstr) - 2;
fiber->name_cstr = new_name_cstr;
/* Init win32 fiber */
if (pool != 0)
{
__profn("CreateFiber");
fiber->pool = pool->kind;
#if VirtualFibersEnabled
fiber->addr = CreateThread(0, W32_FiberStackSize, W32_VirtualFiberEntryPoint, (void *)(i64)fiber_id, 0, 0);
#else
fiber->addr = CreateFiber(W32_FiberStackSize, W32_FiberEntryPoint, (void *)(i64)fiber_id);
#endif
}
else
{
/* Fiber is not a part of a job pool, convert thread to fiber */
__profn("ConvertThreadToFiber");
fiber->addr = ConvertThreadToFiber((void *)(i64)fiber_id);
#if VirtualFibersEnabled
fiber->addr = GetCurrentThread();
#endif
}
}
fiber->task = 0;
fiber->return_id = 0;
return fiber;
}
//- Release fiber
void W32_ReleaseFiber(W32_JobPool *pool, W32_Fiber *fiber)
{
LockTicketMutex(&pool->free_fibers_tm);
{
i16 fiber_id = fiber->id;
fiber->return_id = pool->first_free_fiber_id;
pool->first_free_fiber_id = fiber_id;
}
UnlockTicketMutex(&pool->free_fibers_tm);
}
//- Fiber id
ForceInline W32_Fiber *W32_FiberFromId(i16 id)
{
return id > 0 ? &W32_shared_job_state.fibers[id] : 0;
}
void W32_SwitchToFiber(W32_Fiber *target)
{
#if VirtualFibersEnabled
W32_Fiber *self = W32_FiberFromId(FiberId());
Atomic8Set(&self->virtual_yield, 1);
/* Signal virtual target */
{
Atomic8Set(&target->virtual_yield, 0);
WakeByAddressSingle(&target->virtual_yield);
}
/* Wait for return */
{
i8 vswitch = 1;
while (vswitch != 0)
{
WaitOnAddress(&self->virtual_yield, &vswitch, sizeof(vswitch), INFINITE);
vswitch = Atomic8Fetch(&self->virtual_yield);
}
}
#else
SwitchToFiber(target->addr);
#endif
}
////////////////////////////////
//~ Win32 fiber entry
void W32_FiberEntryPoint(void *_)
{
i16 fiber_id = FiberId();
volatile W32_Fiber *fiber = W32_FiberFromId(fiber_id);
W32_JobPool *pool = &W32_shared_job_state.job_pools[fiber->pool];
JobPool pool_kind = fiber->pool;
char *fiber_name_cstr = fiber->name_cstr;
for (;;)
{
__prof_fiber_enter(fiber_name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(pool_kind) + Kibi(1) + fiber->id);
W32_Task *task = fiber->task;
Job *job = task->job;
/* Run task */
job->func(job->sig, task->task_id);
/* Check if we've completed the last task in the job */
if (Atomic32FetchAdd(&job->num_tasks_completed.v, 1) + 1 >= job->count)
{
/* Increment counter */
if (job->counter)
{
FetchAddFence(&job->counter->num_jobs_completed_fence, 1);
}
/* Free job */
LockTicketMutex(&pool->free_jobs_tm);
{
StackPush(pool->first_free_job, job);
}
UnlockTicketMutex(&pool->free_jobs_tm);
}
/* Free task */
{
LockTicketMutex(&pool->free_tasks_tm);
{
StackPush(pool->first_free_task, task);
}
UnlockTicketMutex(&pool->free_tasks_tm);
}
/* Yield to worker */
{
__prof_fiber_leave();
W32_Fiber *parent_fiber = W32_FiberFromId(fiber->return_id);
W32_SwitchToFiber(parent_fiber);
}
}
}
#if VirtualFibersEnabled
DWORD WINAPI W32_VirtualFiberEntryPoint(LPVOID arg)
{
ConvertThreadToFiber(arg);
Arena *perm = PermArena();
char *fiber_name_cstr = W32_FiberFromId(FiberId())->name_cstr;
wchar_t *fiber_name_wstr = WstrFromString(perm, StringFromCstrNoLimit(fiber_name_cstr));
SetThreadDescription(GetCurrentThread(), fiber_name_wstr);
CoInitializeEx(0, COINIT_MULTITHREADED);
W32_FiberEntryPoint(0);
CoUninitialize();
return 0;
}
#endif
////////////////////////////////
//~ Win32 job worker entry
W32_ThreadDef(W32_JobWorkerEntryPoint, worker_ctx_arg)
{
W32_WorkerCtx *ctx = worker_ctx_arg;
JobPool pool_kind = ctx->pool_kind;
W32_JobPool *pool = &W32_shared_job_state.job_pools[pool_kind];
i16 worker_fiber_id = FiberId();
{
/* TODO: Heuristic pinning */
/* TODO: Pin non-worker threads to other cores */
HANDLE thread_handle = GetCurrentThread();
if (pool->thread_priority != 0)
{
__profn("Set priority");
b32 success = SetThreadPriority(thread_handle, pool->thread_priority) != 0;
Assert(success);
}
#if 0
if (pool->thread_affinity_mask)
{
__profn("Set affinity");
b32 success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0;
#if RtcIsEnabled || ProfilingIsEnabled
{
/* Retry until external tools can set correct process affinity */
i32 delay_ms = 16;
while (!success && delay_ms <= 1024)
{
__profn("Affinity retry");
Sleep(delay_ms);
success = SetThreadAffinityMask(thread_handle, pool->thread_affinity_mask) != 0;
delay_ms *= 2;
}
}
#endif
Assert(success);
}
#endif
if (pool->thread_is_audio)
{
/* https://learn.microsoft.com/en-us/windows/win32/procthread/multimedia-class-scheduler-service#registry-settings */
__profn("Set mm thread characteristics");
DWORD task = 0;
HANDLE mmc_handle = AvSetMmThreadCharacteristics(L"Pro Audio", &task);
Assert(mmc_handle);
}
}
W32_Fiber *task_fiber = 0;
i64 tasks_count = 0;
while (tasks_count >= 0)
{
W32_Task *task = 0;
{
LockTicketMutex(&pool->tasks_tm);
{
task = pool->first_task;
if (task)
{
QueuePop(pool->first_task, pool->last_task);
Atomic64FetchAdd(&pool->tasks_count.v, -1);
}
}
UnlockTicketMutex(&pool->tasks_tm);
}
if (task)
{
if (task->fiber_id != 0)
{
/* Task is resuming with an existing fiber */
if (task_fiber)
{
W32_ReleaseFiber(pool, task_fiber);
}
task_fiber = W32_FiberFromId(task->fiber_id);
}
else
{
/* Task is unstarted, wrap it in a fiber */
if (!task_fiber)
{
task_fiber = W32_AcquireFiber(pool);
}
task_fiber->task = task;
task->fiber_id = task_fiber->id;
}
/* Run task fiber */
task_fiber->return_id = worker_fiber_id;
W32_SwitchToFiber(task_fiber);
if (Atomic8Fetch(&task_fiber->status) == W32_FiberStatus_Suspending)
{
/* Fiber suspended during execution */
Atomic8Set(&task_fiber->status, W32_FiberStatus_Suspended);
task_fiber = 0;
}
}
/* Park worker */
tasks_count = Atomic64Fetch(&pool->tasks_count.v);
while (tasks_count == 0)
{
/* TODO: Add toggleable flag that allows for workers to skip parking
* for high-throughput workloads, and instead just spin until work is available */
WaitOnAddress(&pool->tasks_count, &tasks_count, sizeof(tasks_count), INFINITE);
tasks_count = Atomic64Fetch(&pool->tasks_count.v);
}
}
/* Worker shutdown */
if (task_fiber)
{
W32_ReleaseFiber(pool, task_fiber);
}
}
////////////////////////////////
//~ @hookdef Fiber suspend/resume operations
void SuspendFiber(void)
{
__prof;
i16 fiber_id = FiberId();
W32_Fiber *fiber = W32_FiberFromId(FiberId());
i16 return_id = fiber->return_id;
__prof_fiber_leave();
if (return_id > 0)
{
/* Suspend task fiber (return control flow to parent/worker fiber) */
Atomic8Set(&fiber->status, W32_FiberStatus_Suspending);
W32_Fiber *parent_fiber = W32_FiberFromId(return_id);
W32_SwitchToFiber(parent_fiber);
}
else
{
/* Suspend dedicated fiber (block thread) */
Atomic8Set(&fiber->status, W32_FiberStatus_Suspended);
i8 status = W32_FiberStatus_Suspended;
while (status != W32_FiberStatus_None)
{
WaitOnAddress(&fiber->status, &status, sizeof(status), INFINITE);
status = Atomic8Fetch(&fiber->status);
}
}
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->pool) + Kibi(1) + fiber->id);
}
void ResumeFibers(i16 fiber_ids_count, i16 *fiber_ids)
{
__prof;
/* Group tasks by pool */
W32_TaskList tasks_by_pool[JobPool_Count] = ZI;
for (i16 id_index = 0; id_index < fiber_ids_count; ++id_index)
{
i16 fiber_id = fiber_ids[id_index];
W32_Fiber *fiber = W32_FiberFromId(fiber_id);
/* Wait for fiber to complete suspending */
W32_FiberStatus status = Atomic8Fetch(&fiber->status);
while (status != W32_FiberStatus_Suspended)
{
_mm_pause();
status = Atomic8Fetch(&fiber->status);
}
/* Update fiber status */
Atomic8Set(&fiber->status, W32_FiberStatus_None);
i16 return_id = fiber->return_id;
if (return_id > 0)
{
/* Group task based on pool */
W32_Task *task = fiber->task;
JobPool pool_kind = fiber->pool;
W32_TaskList *pool_tasks = &tasks_by_pool[pool_kind];
QueuePush(pool_tasks->first, pool_tasks->last, task);
++pool_tasks->count;
}
else
{
/* Wake dedicated fiber right now */
WakeByAddressSingle(&fiber->status);
}
}
/* Submit tasks */
for (JobPool pool_kind = 0; pool_kind < JobPool_Count; ++pool_kind)
{
W32_TaskList *tasks = &tasks_by_pool[pool_kind];
i16 count = tasks->count;
if (count > 0)
{
W32_JobPool *pool = &W32_shared_job_state.job_pools[pool_kind];
/* Push tasks to front of queue */
LockTicketMutex(&pool->tasks_tm);
{
tasks->last->next = pool->first_task;
pool->first_task = tasks->first;
if (!pool->last_task)
{
pool->last_task = tasks->last;
}
Atomic64FetchAdd(&pool->tasks_count.v, count);
}
UnlockTicketMutex(&pool->tasks_tm);
/* Wake workers */
if (count >= W32_WakeAllWorkersThreshold)
{
WakeByAddressAll(&pool->tasks_count);
}
else
{
for (i16 i = 0; i < count; ++i)
{
WakeByAddressSingle(&pool->tasks_count);
}
}
}
}
}
////////////////////////////////
//~ @hoodef Job operations
Job *OpenJob(JobFunc *func, JobPool pool_kind)
{
if (pool_kind == JobPool_Inherit)
{
W32_Fiber *fiber = W32_FiberFromId(FiberId());
pool_kind = fiber->pool;
}
W32_JobPool *pool = &W32_shared_job_state.job_pools[pool_kind];
Job *job = 0;
{
Arena *job_arena = 0;
{
{
LockTicketMutex(&pool->free_jobs_tm);
Job *free_job = pool->first_free_job;
if (free_job)
{
pool->first_free_job = free_job->next;
job_arena = free_job->arena;
}
UnlockTicketMutex(&pool->free_jobs_tm);
}
if (job_arena)
{
ResetArena(job_arena);
}
else
{
job_arena = AcquireArena(Mebi(4));
}
}
job = PushStruct(job_arena, Job);
job->arena = job_arena;
}
job->pool = pool_kind;
job->func = func;
job->count = 1;
return job;
}
void CloseJob(Job *job)
{
TempArena scratch = BeginScratchNoConflict();
W32_JobPool *pool = &W32_shared_job_state.job_pools[job->pool];
u32 num_tasks = job->count;
if (num_tasks > 0)
{
if (job->counter)
{
Atomic64FetchAdd(&job->counter->num_jobs_dispatched.v, 1);
}
/* Allocate tasks from free list */
u32 num_tasks_allocated = 0;
W32_Task **tasks_array = PushStructsNoZero(scratch.arena, W32_Task *, num_tasks);
{
LockTicketMutex(&pool->free_tasks_tm);
{
while (num_tasks_allocated < num_tasks)
{
W32_Task *task = pool->first_free_task;
if (task)
{
tasks_array[num_tasks_allocated++] = task;
StackPop(pool->first_free_task);
}
else
{
break;
}
}
}
UnlockTicketMutex(&pool->free_tasks_tm);
}
/* Allocate new tasks from memory */
u32 remaining = num_tasks - num_tasks_allocated;
if (remaining > 0)
{
Arena *perm = PermArena();
PushAlign(perm, CachelineSize);
W32_Task *pushed_tasks = PushStructsNoZero(perm, W32_Task, remaining);
for (u32 i = 0; i < remaining; ++i)
{
tasks_array[num_tasks_allocated + i] = &pushed_tasks[i];
}
num_tasks_allocated += remaining;
PushAlign(perm, CachelineSize);
}
/* Generate task list */
W32_TaskList tasks = ZI;
for (u32 i = 0; i < num_tasks; ++i)
{
W32_Task *task = tasks_array[i];
ZeroStruct(task);
task->job = job;
task->task_id = tasks.count++;
QueuePush(tasks.first, tasks.last, task);
}
/* Push tasks to back of pool */
{
LockTicketMutex(&pool->tasks_tm);
{
if (pool->last_task)
{
pool->last_task->next = tasks.first;
}
else
{
pool->first_task = tasks.first;
}
pool->last_task = tasks.last;
Atomic64FetchAdd(&pool->tasks_count.v, num_tasks);
}
UnlockTicketMutex(&pool->tasks_tm);
}
/* Wake workers */
if (num_tasks >= W32_WakeAllWorkersThreshold)
{
WakeByAddressAll(&pool->tasks_count);
}
else
{
for (u32 i = 0; i < num_tasks; ++i)
{
WakeByAddressSingle(&pool->tasks_count);
}
}
}
EndScratch(scratch);
}