async signal

This commit is contained in:
jacob 2025-12-16 13:50:46 -06:00
parent cbeafdb8fe
commit a2226a2fb8
5 changed files with 51 additions and 25 deletions

View File

@ -4,6 +4,12 @@
#define ArenaHeaderSize 256
#define ArenaBlockSize 16384
Struct(ThreadLocalArenaCtx)
{
Arena *perm;
Arena *scratch[2];
};
////////////////////////////////////////////////////////////
//~ Arena management

View File

@ -25,34 +25,49 @@ void OnAsyncTick(AsyncTickCallbackFunc *func)
Unlock(&lock);
}
void SignalAsyncTick(void)
{
Atomic64FetchAdd(&Base.async.signal.v, 1);
FutexWakeNeq(&Base.async.signal.v);
}
////////////////////////////////////////////////////////////
//~ Async worker
void AsyncWorkerEntryPoint(WaveLaneCtx *lane)
{
AsyncTickCtx *tick = &Base_tl.async.tick_ctx;
tick->arena = AcquireArena(Gibi(64));
AsyncTickCtx tick = ZI;
tick.arena = AcquireArena(Gibi(64));
i64 last_signal = 0;
/* Tick forever */
for (;;)
{
AsyncWorkerCtx *w = &Base.async.worker_ctx;
/* FIXME: Remove this */
SleepSeconds(0.001);
AsyncWorkerCtx *w = &Base.async.worker;
//////////////////////////////
//- Begin tick
//- Wait for signal
WaveSync(lane);
ResetArena(tick->arena);
/* TODO: Passive ticks that run every N seconds */
i64 cur_signal = Atomic64Fetch(&Base.async.signal.v);
while (cur_signal <= last_signal)
{
FutexYieldNeq(&Base.async.signal.v, &cur_signal, sizeof(cur_signal));
cur_signal = Atomic64Fetch(&Base.async.signal.v);
}
last_signal = cur_signal;
//////////////////////////////
//- Collect async callbacks
if (lane->idx == 0)
{
Lock lock = LockE(&Base.async.mutex);
{
w->callbacks_count = Base.async.callback_nodes_count;
w->callbacks = PushStructsNoZero(tick->arena, AsyncTickCallback, w->callbacks_count);
w->callbacks = PushStructsNoZero(tick.arena, AsyncTickCallback, w->callbacks_count);
u64 callback_idx = 0;
for (AsyncTickCallbackNode *n = Base.async.first_callback_node; n; n = n->next)
{
@ -62,16 +77,26 @@ void AsyncWorkerEntryPoint(WaveLaneCtx *lane)
}
Unlock(&lock);
}
WaveSync(lane);
//////////////////////////////
//- Run tick funcs
//- Run async callbacks
for (u64 callback_idx = 0; callback_idx < w->callbacks_count; ++callback_idx)
{
AsyncTickCallback *callback = &w->callbacks[callback_idx];
callback->func(lane, tick);
callback->func(lane, &tick);
}
//////////////////////////////
//- End tick
WaveSync(lane);
ResetArena(tick.arena);
{
Arena *tick_arena = tick.arena;
ZeroStruct(&tick);
tick.arena = tick_arena;
}
}
}

View File

@ -22,7 +22,6 @@ Struct(AsyncTickCtx)
Struct(AsyncWorkerCtx)
{
AsyncTickCtx tick;
u64 callbacks_count;
AsyncTickCallback *callbacks;
};
@ -33,7 +32,10 @@ Struct(AsyncCtx)
u64 callback_nodes_count;
AsyncTickCallbackNode *first_callback_node;
AsyncTickCallbackNode *last_callback_node;
AsyncWorkerCtx worker_ctx;
AsyncWorkerCtx worker;
Atomic64Padded signal;
};
////////////////////////////////////////////////////////////
@ -45,6 +47,7 @@ void BootstrapAsync(void);
//~ Async ops
void OnAsyncTick(AsyncTickCallbackFunc *func);
void SignalAsyncTick(void);
////////////////////////////////////////////////////////////
//~ Async worker

View File

@ -16,16 +16,7 @@ extern BaseCtx Base;
Struct(BaseThreadLocalCtx)
{
struct
{
Arena *perm;
Arena *scratch[2];
} arenas;
struct
{
AsyncTickCtx tick_ctx;
} async;
ThreadLocalArenaCtx arenas;
};
extern ThreadLocal BaseThreadLocalCtx Base_tl;

View File

@ -163,6 +163,7 @@ GC_Run GC_RunFromString(Arena *arena, String str, GC_FontKey font, f32 font_size
SllQueuePush(GC.submit.first, GC.submit.last, n);
}
Unlock(&lock);
SignalAsyncTick();
}
//////////////////////////////