| name | async-worker-pattern |
| description | Implement async worker thread patterns with bounded task queues, admission control, and completion posting. Use when adding blocking I/O operations (DNS, file I/O, network calls) that must not block the main event loop. Covers worker thread design, queue management, completion callbacks, and main-loop integration for event-driven architectures. |
Async Worker Pattern Skill
Implement efficient async worker thread patterns to handle blocking I/O operations without stalling the main event loop. This skill covers the complete lifecycle: task queueing, worker execution, bounded queue management, and completion-driven state transitions.
When to Use This Skill
Apply this skill when:
- Adding blocking I/O (DNS, file operations, network calls) to an event-driven driver
- Main loop must remain responsive under load (never block on system calls)
- Worker threads need to post results back without touching interpreter/global state
- You need admission control to prevent resource exhaustion (DOS protection)
- Completion callbacks must transition objects through state machines
Example scenarios:
- Socket DNS resolution (hostname → IPv4)
- File I/O (read file contents, check directory)
- External HTTP calls with timeout
- Certificate validation or crypto operations
- Database queries in async drivers
Prerequisites
- Understanding of event loops and non-blocking I/O
- Async runtime library with these primitives:
async_queue_allocate(size, element_size, flags) - create bounded queue
async_queue_enqueue(queue, data, size) - main thread → worker
async_queue_dequeue(queue, timeout_ms) - worker → consumer
async_worker_create(work_fn, arg) - spawn worker thread
async_worker_should_stop(worker) - graceful shutdown check
async_worker_destroy(worker) - join and cleanup
async_runtime_post_completion(runtime, completion_key, data) - worker → main loop
- Single-threaded main loop that collects events and dispatches completions
Core Architecture Pattern
┌─────────────────────────────────────────────────────────────┐
│ Main Thread (Non-blocking Event Loop) │
│ │
│ 1. Detect blocking work needed (e.g., hostname) │
│ 2. Queue task to bounded queue + increment counter │
│ 3. Return to application immediately (EESUCCESS) │
│ 4. Poll for completions in event dispatch: │
│ - Check completion_key in event │
│ - Drain results queue │
│ - Transition object state (e.g., RESOLVING → CONNECTING)│
└─────────────────────────────────────────────────────────────┘
↑ ↓
│ task queue │ results queue
│ (bounded) │ (bounded)
↓ ↑
┌─────────────────────────────────────────────────────────────┐
│ Worker Thread (Blocking Operations OK) │
│ │
│ 1. Dequeue task with timeout │
│ 2. Perform blocking work (getaddrinfo, read, etc.) │
│ 3. Format result (status, output data) │
│ 4. Enqueue result │
│ 5. Post completion to main loop (wakes event loop) │
│ 6. Loop until async_worker_should_stop() = true │
└─────────────────────────────────────────────────────────────┘
Step-by-Step Workflow
1. Design Task and Result Structures
Define compact message types for data passing (avoid pointers):
typedef struct {
int object_id;
char hostname[64];
uint16_t port;
time_t deadline;
} dns_task_t;
typedef struct {
int object_id;
struct in_addr resolved_addr;
uint16_t port;
int success;
} dns_result_t;
Key principles:
- Structures are plain data only (no pointers, no LPC objects)
- Include enough context for main loop to find the object back (object_id, socket_id, etc.)
- Keep total size small (<256 bytes) for queue efficiency
- Use fixed-size buffers, never variable-length allocations in queue
2. Declare Bounded Queues and Worker
#define WORK_QUEUE_SIZE 256
#define RESULT_QUEUE_SIZE 256
#define WORK_COMPLETION_KEY 0x574F524B
static async_queue_t *work_queue = NULL;
static async_queue_t *result_queue = NULL;
static async_worker_t *worker = NULL;
static int in_flight_count = 0;
#define GLOBAL_CAP 64
#define PER_OWNER_CAP 8
3. Implement Worker Thread Function
Worker function executes in worker thread context (blocking is OK):
static void worker_main(async_worker_t *worker, void *arg) {
async_queue_t *queue = (async_queue_t *)arg;
dns_task_t *task;
dns_result_t result;
while (!async_worker_should_stop(worker)) {
task = (dns_task_t *)async_queue_dequeue(queue, 100);
if (task == NULL)
continue;
struct addrinfo hints, *results;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
result.object_id = task->object_id;
result.port = task->port;
result.success = 0;
if (getaddrinfo(task->hostname, NULL, &hints, &results) == 0) {
for (struct addrinfo *entry = results; entry; entry = entry->ai_next) {
if (entry->ai_family == AF_INET && entry->ai_addr) {
result.resolved_addr =
((struct sockaddr_in *)entry->ai_addr)->sin_addr;
result.success = 1;
break;
}
}
freeaddrinfo(results);
}
async_queue_enqueue(result_queue, &result, sizeof(result));
async_runtime_post_completion(get_async_runtime(),
WORK_COMPLETION_KEY, 1);
mp_free(task);
}
}
Critical patterns:
- Dequeue with timeout (not 0) to periodically check
async_worker_should_stop()
- Never touch shared mutable state (globals, LPC objects, etc.)
- Always enqueue result even on error (use
success flag for status)
- Post completion after enqueueing result (ensures result is available when main loop wakes)
- Free task memory if allocator tracks ownership
4. Implement Task Queueing (Main Thread Only)
Detect blocking work need and queue task non-blockingly:
static int queue_work(int object_id, const char *hostname, uint16_t port) {
if (in_flight_count >= GLOBAL_CAP)
return 0;
if (count_pending_for_owner(object_id) >= PER_OWNER_CAP)
return 0;
dns_task_t task;
task.object_id = object_id;
strncpy(task.hostname, hostname, sizeof(task.hostname) - 1);
task.hostname[sizeof(task.hostname) - 1] = '\0';
task.port = port;
task.deadline = current_time() + DNS_TIMEOUT;
if (!async_queue_enqueue(work_queue, &task, sizeof(task)))
return 0;
in_flight_count++;
return 1;
}
5. Implement Completion Handler (Main Loop Integration)
Called from main loop when completion_key fires:
static void process_completions(void) {
dns_result_t *result;
while ((result = (dns_result_t *)async_queue_dequeue(result_queue, 0))
!= NULL) {
int object_id = result->object_id;
object_t *obj = find_object_by_id(object_id);
if (obj == NULL || obj->flags & O_DESTRUCTED) {
in_flight_count--;
mp_free(result);
continue;
}
if (!result->success) {
apply_dns_failed_callback(obj);
in_flight_count--;
mp_free(result);
continue;
}
obj->resolved_addr = result->resolved_addr;
obj->port = result->port;
if (!attempt_connect(obj, &result->resolved_addr, result->port)) {
apply_dns_failed_callback(obj);
}
in_flight_count--;
mp_free(result);
}
}
6. Integrate with Main Event Loop
Register completion handler in event dispatcher:
for (each async_event in event_list) {
if (evt->completion_key == CONSOLE_KEY) {
}
else if (evt->completion_key == WORK_COMPLETION_KEY) {
process_completions();
}
else if (is_socket_readable(evt)) {
}
}
7. Initialize and Cleanup
Call init/deinit symmetrically:
static int init_async_work(void) {
work_queue = async_queue_allocate(WORK_QUEUE_SIZE, sizeof(dns_task_t),
ASYNC_QUEUE_DROP_OLDEST);
if (!work_queue) return 0;
result_queue = async_queue_allocate(RESULT_QUEUE_SIZE, sizeof(dns_result_t),
ASYNC_QUEUE_DROP_OLDEST);
if (!result_queue) {
async_queue_free(work_queue);
return 0;
}
worker = async_worker_create(worker_main, work_queue);
if (!worker) {
async_queue_free(work_queue);
async_queue_free(result_queue);
return 0;
}
return 1;
}
static void deinit_async_work(void) {
if (worker) {
async_worker_destroy(worker);
worker = NULL;
}
if (work_queue) {
async_queue_free(work_queue);
work_queue = NULL;
}
if (result_queue) {
async_queue_free(result_queue);
result_queue = NULL;
}
in_flight_count = 0;
}
Common Patterns & Pitfalls
✅ DO: Keep Worker Pure (No Shared State)
static void worker_main(async_worker_t *w, void *arg) {
dns_task_t *task = async_queue_dequeue(queue, 100);
const char *hostname = task->hostname;
struct addrinfo *results = NULL;
getaddrinfo(hostname, NULL, &hints, &results);
}
❌ DON'T: Access Shared Mutable Globals
static void worker_main(async_worker_t *w, void *arg) {
dns_task_t *task = async_queue_dequeue(queue, 100);
shared_object_table[task->object_id].status = RESOLVED;
}
✅ DO: Always Post Completion After Enqueueing Result
async_queue_enqueue(result_queue, &result, sizeof(result));
async_runtime_post_completion(runtime, KEY, 1);
❌ DON'T: Post Completion Before Enqueueing
async_runtime_post_completion(runtime, KEY, 1);
async_queue_enqueue(result_queue, &result, sizeof(result));
✅ DO: Check Object Still Valid in Completion Handler
object_t *obj = find_object_by_id(result->object_id);
if (obj == NULL || obj->flags & O_DESTRUCTED) {
in_flight_count--;
continue;
}
❌ DON'T: Assume Object Survives Until Completion
object_t *obj = &global_objects[result->object_id];
obj->status = DNS_RESOLVED;
✅ DO: Use Bounded Admission Control
if (in_flight_count >= GLOBAL_CAP) return EESOCKET;
if (per_owner_count >= PER_OWNER_CAP) return EESOCKET;
❌ DON'T: Unbounded Task Queueing
async_queue_enqueue(work_queue, task, size);
Troubleshooting
| Issue | Cause | Solution |
|---|
| Main loop appears to hang | Worker thread blocked main event loop | Verify worker never calls event-loop-unsafe functions; check for async_queue_dequeue(queue, 0) in worker |
| Tasks never process | Worker not running or dequeue failing | Check async_worker_create() returned non-NULL; verify worker dequeue timeout is > 0 |
| Completions not firing | Main loop not checking completion_key | Register completion handler in event dispatcher; verify completion_key is unique |
| Objects already destroyed when completion fires | Timing window | Always validate object with find_object_by_id() + check O_DESTRUCTED flag |
| Queue fills up and tasks DROP | Completion handler too slow or consumer blocked | Increase queue size; check completion handler for inefficiencies; verify no recursive calls |
| Resource leak on shutdown | Worker thread not joining | Call async_worker_destroy() in deinit; verify all mp_alloc() in worker match mp_free() |
Real-World Example: Socket DNS Resolution
See lib/socket/socket_efuns.c Stage 4A implementation for a complete example:
- Task structure:
dns_task_t (socket_id, hostname, port)
- Result structure:
dns_result_t (socket_id, resolved_addr, port, success)
- Worker function:
dns_worker_main() uses getaddrinfo() in worker thread
- Admission control: Global cap (64), per-owner cap (8)
- Completion handler:
process_dns_completions() drains results and transitions sockets
- Main loop integration: DNS_COMPLETION_KEY check in src/comm.c
Test validation: All 26 socket tests pass; SOCK_DNS_001/002 enabled, SOCK_DNS_003 validates async path.
References