job refactor

This commit is contained in:
jacob 2025-08-05 14:58:22 -05:00
parent 632f12dd63
commit 26bff4e741
11 changed files with 113 additions and 97 deletions

View File

@ -128,7 +128,7 @@ AppArgList ParseAppArgs(Arena *arena, String args_str)
////////////////////////////////
//~ @hookdef Entry point
void EntryPoint(void)
void Startup(void)
{
__prof;
TempArena scratch = BeginScratchNoConflict();

View File

@ -17,9 +17,7 @@ Arena *AllocArena(u64 reserve)
if (!base)
{
/* Hard fail on memory reserve failure for now */
/* FIXME: Enable this */
//Panic(Lit("Failed to reserve memory"));
(*(volatile int *)0) = 0;
Panic(Lit("Failed to reserve memory"));
}
u64 reserved = reserve;
AddGstat(GSTAT_MEMORY_RESERVED, reserve);
@ -29,9 +27,7 @@ Arena *AllocArena(u64 reserve)
if (!base)
{
/* Hard fail on commit failure */
/* FIXME: Enable this */
//Panic(Lit("Failed to commit initial memory block: System may be out of memory"));
(*(volatile int *)0) = 0;
Panic(Lit("Failed to commit initial memory block: System may be out of memory"));
}
Assert(((u64)base & 0xFFF) == 0); /* Base should be 4k aligned */
@ -88,17 +84,13 @@ void *PushBytesNoZero(Arena *arena, u64 size, u64 align)
if (new_capacity > arena->reserved)
{
/* Hard fail if we overflow reserved memory for now */
/* FIXME: Enable this */
//Panic(Lit("Failed to commit new memory block: Overflow of reserved memory"));
(*(volatile int *)0) = 0;
Panic(Lit("Failed to commit new memory block: Overflow of reserved memory"));
}
void *commit_address = base + arena->committed;
if (!CommitMemory(commit_address, commit_bytes))
{
/* Hard fail on memory allocation failure for now */
/* FIXME: Enable this */
//Panic(Lit("Failed to commit new memory block: System may be out of memory"));
(*(volatile int *)0) = 0;
Panic(Lit("Failed to commit new memory block: System may be out of memory"));
}
arena->committed += commit_bytes;
AddGstat(GSTAT_MEMORY_COMMITTED, commit_bytes);

View File

@ -163,6 +163,7 @@ Inline ArenaCtx *ArenaCtxFromFiberId(i16 fiber_id)
SharedArenaCtx *shared = &shared_arena_ctx;
ArenaCtx *ctx = &shared->arena_contexts[fiber_id];
if (!ctx->scratch_arenas[0]) {
__profn("Initialize fiber arena ctx");
for (i32 i = 0; i < (i32)countof(ctx->scratch_arenas); ++i) {
ctx->scratch_arenas[i] = AllocArena(Gibi(64));
}

View File

@ -12,6 +12,6 @@ void Exit(void);
void Panic(String msg);
////////////////////////////////
//~ @hookdecl Application defined entry point
//~ @hookdecl Application defined hooks
void EntryPoint(void);
void Startup(void);

View File

@ -45,7 +45,7 @@ void FutexWait(volatile void *addr, void *cmp, u32 size, i64 timeout_ns);
void FutexWake(void *addr, i32 count);
////////////////////////////////
//~ @hookdecl Job helpers
//~ @hookdecl Job operations
#define EmptySig { i32 _; }
@ -60,6 +60,10 @@ Struct(GenericJobDesc)
JobPool pool;
JobPriority priority;
Counter *counter;
/* Internal */
Atomic32 num_completed;
GenericJobDesc *next_free;
};
Struct(JobDescParams)

View File

@ -8,7 +8,7 @@ JobDef(W32_AppStartupJob, UNUSED sig, UNUSED id)
W32_SharedEntryCtx *g = &W32_shared_entry_ctx;
TempArena scratch = BeginScratchNoConflict();
{
EntryPoint();
Startup();
SetEvent(g->startup_end_event);
}
EndScratch(scratch);

View File

@ -1,7 +1,5 @@
W32_SharedJobCtx W32_shared_job_ctx = ZI;
/* FIXME: Enable logs, panic, shutdown */
////////////////////////////////
//~ Win32 libs
@ -40,6 +38,9 @@ void StartupBaseJobs(void)
g->num_fibers = 1; /* Fiber at index 0 always nil */
g->fiber_names_arena = AllocArena(Gibi(64));
/* Init job descs */
g->job_descs_arena = AllocArena(Gibi(64));
/* Convert main thread to fiber */
W32_AllocFiber(0);
@ -543,10 +544,8 @@ void W32_WakeLockedFibers(i32 num_fibers, W32_Fiber **fibers)
ZeroStruct(info);
info->count = 1;
info->num_dispatched = fiber->job_id;
info->func = fiber->job_func;
info->sig = fiber->job_sig;
info->counter = fiber->job_counter;
info->fiber_id = fiber->id;
info->desc = fiber->job_desc;
if (queue->first)
{
info->next = queue->first;
@ -798,12 +797,9 @@ W32_Fiber *W32_AllocFiber(W32_JobPool *pool)
fiber->next_addr_waiter = 0;
fiber->prev_time_waiter = 0;
fiber->next_time_waiter = 0;
fiber->job_func = 0;
fiber->job_sig = 0;
fiber->job_id = 0;
fiber->job_pool = 0;
fiber->job_priority = 0;
fiber->job_counter = 0;
fiber->yield_param = 0;
fiber->parent_id = 0;
return fiber;
@ -857,33 +853,48 @@ void W32_YieldFiber(W32_Fiber *fiber, W32_Fiber *parent_fiber)
}
}
//- Fiber entry
////////////////////////////////
//~ Win32 fiber entry
void W32_FiberEntryPoint(void *id_ptr)
{
W32_SharedJobCtx *g = &W32_shared_job_ctx;
i16 id = (i32)(i64)id_ptr;
volatile W32_Fiber *fiber = W32_FiberFromId(id);
__prof_fiber_enter(fiber->name_cstr, PROF_THREAD_GROUP_FIBERS - Mebi(fiber->job_pool) + Kibi(1) + fiber->id);
for (;;)
{
/* Run job */
//- Run job
{
W32_YieldParam *yield_param = fiber->yield_param;
yield_param->kind = W32_YieldKind_None;
{
MemoryBarrier();
GenericJobFunc *job_func = fiber->job_func;
job_func(fiber->job_sig, fiber->job_id);
GenericJobDesc *job_desc = fiber->job_desc;
GenericJobFunc *job_func = job_desc->func;
job_func(job_desc->sig, fiber->job_id);
MemoryBarrier();
}
}
/* Job completed, yield */
//- Job completed
{
/* Decrement job counter */
Counter *job_counter = fiber->job_counter;
if (job_counter)
GenericJobDesc *job_desc = fiber->job_desc;
/* Check if we're the last completed dispatch in the job */
if ((Atomic32FetchAdd(&job_desc->num_completed, 1) + 1) >= job_desc->count)
{
AddCounter(job_counter, -1);
/* Decrement counter */
if (job_desc->counter)
{
AddCounter(job_desc->counter, -1);
}
/* Release job desc */
ResetArena(job_desc->arena);
LockTicketMutex(&g->job_descs_tm);
{
job_desc->next_free = g->first_free_job_desc;
g->first_free_job_desc = job_desc;
}
UnlockTicketMutex(&g->job_descs_tm);
}
/* Yield to worker */
fiber->yield_param->kind = W32_YieldKind_Done;
@ -894,7 +905,7 @@ void W32_FiberEntryPoint(void *id_ptr)
}
////////////////////////////////
//~ Win32 job worker
//~ Win32 job worker entry
W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
{
@ -958,15 +969,13 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
while (!shutdown)
{
//- Pull job from queue
GenericJobDesc *job_desc = 0;
JobPriority job_priority = 0;
i16 job_fiber_id = 0;
i32 job_id = 0;
GenericJobFunc *job_func = 0;
void *job_sig = 0;
Counter *job_counter = 0;
{
//__profnc("Pull job", Rgb32F(0.75, 0.75, 0));
for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues) && !job_func; ++priority)
for (JobPriority priority = 0; priority < (i32)countof(pool->job_queues) && !job_desc; ++priority)
{
W32_JobQueue *queue = &pool->job_queues[priority];
if (queue)
@ -974,7 +983,7 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
LockTicketMutex(&queue->lock);
{
W32_JobInfo *info = queue->first;
while (info && !job_func)
while (info && !job_desc)
{
W32_JobInfo *next = info->next;
b32 dequeue = 0;
@ -986,9 +995,7 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
/* Pick job */
Atomic64FetchAdd(&pool->num_jobs_in_queue.v, -1);
job_priority = priority;
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
job_desc = info->desc;
if (job_id == (info->count - 1))
{
/* We're picking up the last dispatch, so dequeue the job */
@ -1003,9 +1010,7 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
job_fiber_id = info->fiber_id;
job_priority = priority;
job_id = info->num_dispatched;
job_func = info->func;
job_sig = info->sig;
job_counter = info->counter;
job_desc = info->desc;
dequeue = 1;
}
if (dequeue)
@ -1037,7 +1042,7 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
}
//- Run fiber
if (job_func)
if (job_desc)
{
if (!job_fiber)
{
@ -1049,12 +1054,10 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
__profvalue(job_fiber->id);
W32_YieldParam yield = ZI;
job_fiber->parent_id = worker_fiber_id;
job_fiber->job_func = job_func;
job_fiber->job_sig = job_sig;
job_fiber->job_desc = job_desc;
job_fiber->job_id = job_id;
job_fiber->job_pool = pool_kind;
job_fiber->job_priority = job_priority;
job_fiber->job_counter = job_counter;
job_fiber->yield_param = &yield;
b32 done = 0;
while (!done)
@ -1276,7 +1279,7 @@ W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg)
}
////////////////////////////////
//~ Win32 job scheduler
//~ Win32 job scheduler entry
W32_ThreadDef(W32_JobSchedulerEntryFunc, UNUSED arg)
{
@ -1398,13 +1401,29 @@ void FutexWake(void *addr, i32 count)
}
////////////////////////////////
//~ @hoodef Job
//~ @hoodef Job operations
GenericJobDesc *PushJobDesc_(u64 sig_size, u64 sig_align, GenericJobFunc *func, JobDescParams params)
{
struct W32_SharedJobCtx *g = &W32_shared_job_ctx;
GenericJobDesc *result = 0;
/* FIXME: Pool arenas */
Arena *arena = AllocArena(Mebi(1));
Arena *arena = 0;
LockTicketMutex(&g->job_descs_tm);
{
if (g->first_free_job_desc)
{
result = g->first_free_job_desc;
g->first_free_job_desc = result->next_free;
arena = result->arena;
}
else
{
result = PushStructNoZero(g->job_descs_arena, GenericJobDesc);
arena = AllocArena(Mebi(4));
}
}
UnlockTicketMutex(&g->job_descs_tm);
ZeroStruct(result);
result = PushStruct(arena, GenericJobDesc);
result->arena = arena;
result->sig = PushBytes(arena, sig_size, sig_align);
@ -1424,13 +1443,11 @@ void RunJobEx(GenericJobDesc *desc)
Counter *counter = desc->counter;
JobPool pool_kind = desc->pool;
JobPriority priority = desc->priority;
GenericJobFunc *func = desc->func;
void *sig = desc->sig;
if (count > 0)
{
if (counter)
{
AddCounter(counter, count);
AddCounter(counter, 1);
}
W32_Fiber *fiber = W32_FiberFromId(FiberId());
priority = ClampI32(priority, fiber->job_priority, JobPriority_Count - 1); /* A job cannot create a job with a higher priority than itself */
@ -1453,10 +1470,8 @@ void RunJobEx(GenericJobDesc *desc)
info = PushStructNoZero(queue->arena, W32_JobInfo);
}
ZeroStruct(info);
info->desc = desc;
info->count = count;
info->func = func;
info->sig = sig;
info->counter = counter;
if (queue->last)
{
queue->last->next = info;

View File

@ -115,25 +115,17 @@ AlignedStruct(W32_Fiber, 64)
i16 next_time_waiter; /* 02 bytes */
i16 prev_time_waiter; /* 02 bytes */
/* ---------------------------------------------------- */
u8 _pad1[8]; /* 08 bytes (padding) */
/* ---------------------------------------------------- */
u8 _pad2[8]; /* 08 bytes (padding) */
GenericJobDesc *job_desc; /* 08 bytes */
/* ---------------------------------------------------- */
/* -------------------- Cache line -------------------- */
/* ---------------------------------------------------- */
GenericJobFunc *job_func; /* 08 bytes */
/* ---------------------------------------------------- */
void *job_sig; /* 08 bytes */
/* ---------------------------------------------------- */
i32 job_id; /* 04 bytes */
i16 job_pool; /* 02 bytes */
i16 job_priority; /* 02 bytes */
/* ---------------------------------------------------- */
struct Counter *job_counter; /* 08 bytes */
/* ---------------------------------------------------- */
W32_YieldParam *yield_param; /* 08 bytes */
/* ---------------------------------------------------- */
u8 _pad3[24]; /* 24 bytes (padding) */
u8 _pad0[48]; /* 48 bytes (padding) */
};
StaticAssert(sizeof(W32_Fiber) == 128); /* Padding validation (increase if necessary) */
@ -153,13 +145,10 @@ AlignedStruct(W32_WorkerCtx, 64)
//- Job info
Struct(W32_JobInfo)
{
i32 num_dispatched;
GenericJobDesc *desc;
i32 count;
GenericJobFunc *func;
void *sig;
struct Counter *counter;
i32 num_dispatched;
i16 fiber_id; /* If the job is being resumed from a yield */
W32_JobInfo *next;
@ -236,6 +225,11 @@ Struct(W32_SharedJobCtx)
Arena *fiber_names_arena;
W32_Fiber fibers[MaxFibers];
//- Job descs
Arena *job_descs_arena;
TicketMutex job_descs_tm;
GenericJobDesc *first_free_job_desc;
//- Wait lists
Atomic64Padded waiter_wake_gen;
TicketMutex wait_lists_arena_tm;
@ -284,10 +278,18 @@ void W32_ReleaseFiber(W32_JobPool *pool, W32_Fiber *fiber);
ForceInline W32_Fiber *W32_FiberFromId(i16 id);
ForceNoInline void W32_FiberResume(W32_Fiber *fiber);
void W32_YieldFiber(W32_Fiber *fiber, W32_Fiber *parent_fiber);
////////////////////////////////
//~ Fiber entry
void W32_FiberEntryPoint(void *id_ptr);
////////////////////////////////
//~ Workers
//~ Job worker entry
W32_ThreadDef(W32_JobWorkerEntryFunc, worker_ctx_arg);
////////////////////////////////
//~ Job scheduler entry
W32_ThreadDef(W32_JobSchedulerEntryFunc, _);

View File

@ -326,6 +326,7 @@ void GPU_D12_InitObjects(void)
Counter counter = ZI;
RunJob(DX12_NUM_QUEUES, GPU_D12_AllocCommandQueueJob, JobPool_Inherit, JobPriority_Low, &counter, .descs_in = params, .cqs_out = g->command_queues);
WaitOnCounter(&counter);
DEBUGBREAKABLE;
}
#if ProfilingIsEnabled
{
@ -3284,27 +3285,30 @@ void GPU_PresentSwapchain(GPU_Swapchain *gp_swapchain, Vec2I32 backbuffer_resolu
#if ProfilingGpu
{
__profframe(0);
__profn("Mark queue frames");
/* Lock because frame marks shouldn't occur while command lists are recording */
Lock lock = LockE(&g->global_command_list_record_mutex);
for (u32 i = 0; i < countof(g->command_queues); ++i)
GPU_D12_SharedState *g = &GPU_D12_shared_state;
{
__profframe(0);
__profn("Mark queue frames");
/* Lock because frame marks shouldn't occur while command lists are recording */
Lock lock = LockE(&g->global_command_list_record_mutex);
for (u32 i = 0; i < countof(g->command_queues); ++i)
{
{
GPU_D12_CommandQueue *cq = g->command_queues[i];
__prof_dx12_new_frame(cq->prof);
}
}
Unlock(&lock);
}
{
__profn("Collect queues");
for (u32 i = 0; i < countof(g->command_queues); ++i)
{
GPU_D12_CommandQueue *cq = g->command_queues[i];
__prof_dx12_new_frame(cq->prof);
__prof_dx12_collect(cq->prof);
}
}
Unlock(&lock);
}
{
__profn("Collect queues");
for (u32 i = 0; i < countof(g->command_queues); ++i)
{
GPU_D12_CommandQueue *cq = g->command_queues[i];
__prof_dx12_collect(cq->prof);
}
}
#endif
}

View File

@ -6,16 +6,13 @@ RES_SharedState RES_shared_state = ZI;
void RES_StartupCore(void)
{
__prof;
RES_SharedState *g = &RES_shared_state;
g->arena = AllocArena(Gibi(64));
#if RESOURCES_EMBEDDED
String embedded_data = INC_GetResTar();
if (embedded_data.len <= 0)
{
Panic(Lit("No embedded resources found"));
}
g->archive = TAR_ArchiveFromString(g->arena, embedded_data, Lit(""));
g->archive = TAR_ArchiveFromString(GetPermArena(), embedded_data, Lit(""));
#else
/* Ensure we have the right working directory */
if (!P_IsDir(Lit("res")))

View File

@ -22,9 +22,10 @@ Struct(RES_Resource)
Struct(RES_SharedState)
{
Arena *arena;
#if RESOURCES_EMBEDDED
TAR_Archive archive;
#else
i32 _;
#endif
};