| name | loom-background-jobs |
| description | Background job processing patterns including job queues, scheduled jobs, worker pools, retry strategies, and delivery guarantees. Use when implementing async processing, ETL pipelines, ML training jobs, cron schedules, or dead letter queues. |
| triggers | ["async processing","job queue","job queues","workers","task queue","task queues","async tasks","delayed jobs","recurring jobs","scheduled tasks","ETL pipelines","data processing","ML training jobs","Celery","Bull","Sidekiq","Resque","cron jobs","retry logic","dead letter queues","DLQ","at-least-once delivery","exactly-once delivery","job monitoring","worker management"] |
Background Jobs
Overview
Background jobs enable asynchronous processing of tasks outside the request-response cycle. This skill covers job queue patterns, scheduling, worker management, retry strategies, and monitoring for reliable task execution across different frameworks and languages.
Key Concepts
Job Queue Patterns
Bull Queue (Node.js/Redis):
import Queue, { Job, JobOptions } from "bull";
import { Redis } from "ioredis";
interface QueueConfig {
name: string;
redis: Redis;
defaultJobOptions?: JobOptions;
}
interface EmailJobData {
to: string;
subject: string;
template: string;
context: Record<string, unknown>;
}
interface ImageProcessingJobData {
imageId: string;
operations: Array<{
type: "resize" | "crop" | "compress";
params: Record<string, unknown>;
}>;
}
function createQueue<T>(config: QueueConfig): Queue.Queue<T> {
const queue = new Queue<T>(config.name, {
createClient: (type) => {
switch (type) {
case "client":
return config.redis.duplicate();
case "subscriber":
return config.redis.duplicate();
case "bclient":
return config.redis.duplicate();
default:
return config.redis.duplicate();
}
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 1000,
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
...config.defaultJobOptions,
},
});
queue.on("error", (error) => {
console.error(`Queue ${config.name} error:`, error);
});
return queue;
}
const emailQueue = createQueue<EmailJobData>({
name: "email",
redis: new Redis(process.env.REDIS_URL),
});
emailQueue.process(async (job: Job<EmailJobData>) => {
const { to, subject, template, context } = job.data;
await job.progress(10);
const html = await renderTemplate(template, context);
await job.progress(50);
await emailService.send({ to, subject, html });
await job.progress(100);
return { sent: true, messageId: `msg_${Date.now()}` };
});
async function sendEmail(
data: EmailJobData,
options?: JobOptions,
): Promise<Job<EmailJobData>> {
return emailQueue.add(data, {
priority: options?.priority || 0,
delay: options?.delay || 0,
jobId: options?.jobId,
...options,
});
}
async function sendBulkEmails(
emails: EmailJobData[],
): Promise<Job<EmailJobData>[]> {
const jobs = emails.map((data, index) => ({
data,
opts: {
jobId: `bulk_${Date.now()}_${index}`,
},
}));
return emailQueue.addBulk(jobs);
}
Celery (Python):
from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError
from typing import Any, Dict, Optional
import logging
app = Celery('tasks')
app.config_from_object({
'broker_url': 'redis://localhost:6379/0',
'result_backend': 'redis://localhost:6379/1',
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json'],
'timezone': 'UTC',
'task_track_started': True,
'task_time_limit': 300,
'task_soft_time_limit': 240,
'worker_prefetch_multiplier': 4,
'task_acks_late': True,
'task_reject_on_worker_lost': True,
})
logger = logging.getLogger(__name__)
class BaseTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3}
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(f'Task {self.name}[{task_id}] failed: {exc}')
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f'Task {self.name}[{task_id}] retrying: {exc}')
def on_success(self, retval, task_id, args, kwargs):
logger.info(f'Task {self.name}[{task_id}] succeeded')
@app.task(base=BaseTask, bind=True, name='send_email')
def send_email(
self,
to: str,
subject: str,
template: str,
context: Dict[str, Any]
) -> Dict[str, Any]:
try:
self.update_state(state='PROGRESS', meta={'progress': 10})
html = render_template(template, context)
self.update_state(state='PROGRESS', meta={'progress': 50})
message_id = email_service.send(to=to, subject=subject, html=html)
self.update_state(state='PROGRESS', meta={'progress': 100})
return {'sent': True, 'message_id': message_id}
except ConnectionError as exc:
raise self.retry(exc=exc, countdown=60)
@app.task(base=BaseTask, bind=True, name='process_image')
def process_image(self, image_id: str, operations: list) -> Dict[str, Any]:
image = load_image(image_id)
for i, op in enumerate(operations):
progress = int((i + 1) / len(operations) * 100)
self.update_state(state='PROGRESS', meta={'progress': progress, 'operation': op['type']})
if op['type'] == 'resize':
image = resize_image(image, **op['params'])
elif op['type'] == 'crop':
image = crop_image(image, **op['params'])
elif op['type'] == 'compress':
image = compress_image(image, **op['params'])
url = save_image(image, image_id)
return {'url': url, 'operations_count': len(operations)}
from celery import chain, group, chord
def process_order(order_id: str):
"""Process order with chained tasks."""
workflow = chain(
validate_order.s(order_id),
reserve_inventory.s(),
process_payment.s(),
send_confirmation.s(),
)
return workflow.apply_async()
def process_bulk_images(image_ids: list):
"""Process multiple images in parallel, then aggregate results."""
workflow = chord(
group(process_image.s(img_id, [{'type': 'resize', 'params': {'width': 800}}])
for img_id in image_ids),
aggregate_results.s()
)
return workflow.apply_async()
Sidekiq (Ruby):
Sidekiq.configure_server do |config|
config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
config.death_handlers << ->(job, ex) do
ErrorReporter.report(ex, job: job)
end
end
Sidekiq.configure_client do |config|
config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
end
class EmailWorker
include Sidekiq::Worker
sidekiq_options queue: :default,
retry: 5,
backtrace: true,
dead: true
sidekiq_retry_in do |count, exception|
(count + 1) ** 3
end
sidekiq_retries_exhausted do |msg, exception|
Rails.logger.error "Job #{msg['jid']} exhausted retries: #{exception.message}"
DeadJobNotifier.notify(msg, exception)
end
def perform(to, subject, template, context)
html = ApplicationController.render(
template: template,
locals: context.symbolize_keys
)
EmailService.send(to: to, subject: subject, html: html)
end
end
class BatchWorker
include Sidekiq::Worker
def perform(batch_id)
batch = Batch.find(batch_id)
batch.items.find_each do |item|
ItemProcessor.perform_async(item.id)
end
end
end
class ImportWorker
include Sidekiq::Worker
def perform(import_id)
import = Import.find(import_id)
batch = Sidekiq::Batch.new
batch.description = "Import #{import_id}"
batch.on(:complete, ImportCallbacks, import_id: import_id)
batch.jobs do
import.rows.each_with_index do |row, index|
ImportRowWorker.perform_async(import_id, index, row)
end
end
end
end
class ImportCallbacks
def on_complete(status, options)
import = Import.find(options['import_id'])
if status.failures.zero?
import.update!(status: 'completed')
else
import.update!(status: 'completed_with_errors', error_count: status.failures)
end
end
end
Scheduled Jobs and Cron Patterns
import Queue from "bull";
const scheduledQueue = new Queue("scheduled-tasks", process.env.REDIS_URL);
async function setupScheduledJobs(): Promise<void> {
await scheduledQueue.add(
"cleanup",
{},
{
repeat: { cron: "0 * * * *" },
jobId: "cleanup-hourly",
},
);
await scheduledQueue.add(
"daily-report",
{},
{
repeat: { cron: "0 9 * * *" },
jobId: "daily-report",
},
);
await scheduledQueue.add(
"health-check",
{},
{
repeat: { every: 5 * 60 * 1000 },
jobId: "health-check",
},
);
await scheduledQueue.add(
"weekly-cleanup",
{},
{
repeat: { cron: "0 0 * * 0" },
jobId: "weekly-cleanup",
},
);
}
scheduledQueue.process("cleanup", async (job) => {
await cleanupOldRecords();
return { cleaned: true };
});
scheduledQueue.process("daily-report", async (job) => {
const report = await generateDailyReport();
await sendReportEmail(report);
return { reportId: report.id };
});
async function getScheduledJobs(): Promise<
Array<{ name: string; next: Date; cron: string }>
> {
const repeatableJobs = await scheduledQueue.getRepeatableJobs();
return repeatableJobs.map((job) => ({
name: job.name,
next: new Date(job.next),
cron: job.cron || `Every ${job.every}ms`,
}));
}
async function removeScheduledJob(jobId: string): Promise<void> {
const jobs = await scheduledQueue.getRepeatableJobs();
const job = jobs.find((j) => j.id === jobId);
if (job) {
await scheduledQueue.removeRepeatableByKey(job.key);
}
}
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.conf.beat_schedule = {
'cleanup-hourly': {
'task': 'tasks.cleanup',
'schedule': crontab(minute=0),
},
'daily-report': {
'task': 'tasks.daily_report',
'schedule': crontab(hour=9, minute=0),
},
'health-check': {
'task': 'tasks.health_check',
'schedule': 300.0,
},
'weekly-cleanup': {
'task': 'tasks.weekly_cleanup',
'schedule': crontab(hour=0, minute=0, day_of_week=0),
},
'monthly-report': {
'task': 'tasks.monthly_report',
'schedule': crontab(hour=6, minute=0, day_of_month=1),
},
'check-expiring-subscriptions': {
'task': 'tasks.check_subscriptions',
'schedule': crontab(hour=8, minute=0),
'args': ('expiring',),
'kwargs': {'days_ahead': 7},
},
}
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
def create_scheduled_task(name: str, task: str, cron: str, args: list = None, kwargs: dict = None):
"""Create a scheduled task dynamically."""
minute, hour, day_of_month, month, day_of_week = cron.split()
schedule, _ = CrontabSchedule.objects.get_or_create(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month,
day_of_week=day_of_week,
)
PeriodicTask.objects.update_or_create(
name=name,
defaults={
'task': task,
'crontab': schedule,
'args': json.dumps(args or []),
'kwargs': json.dumps(kwargs or {}),
'enabled': True,
},
)
Worker Pool Management
import Queue, { Job } from "bull";
import os from "os";
interface WorkerPoolConfig {
concurrency: number;
limiter?: {
max: number;
duration: number;
};
}
class WorkerPool {
private queues: Map<string, Queue.Queue> = new Map();
private isShuttingDown = false;
constructor(private config: WorkerPoolConfig) {
process.on("SIGTERM", () => this.shutdown());
process.on("SIGINT", () => this.shutdown());
}
registerQueue<T>(
name: string,
processor: (job: Job<T>) => Promise<unknown>,
): Queue.Queue<T> {
const queue = new Queue<T>(name, process.env.REDIS_URL!, {
limiter: this.config.limiter,
});
queue.process(this.config.concurrency, async (job: Job<T>) => {
if (this.isShuttingDown) {
throw new Error("Worker shutting down");
}
return processor(job);
});
queue.on("completed", (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
queue.on("failed", (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
});
queue.on("stalled", (job) => {
console.warn(`Job ${job} stalled`);
});
this.queues.set(name, queue);
return queue;
}
async shutdown(): Promise<void> {
console.log("Initiating graceful shutdown...");
this.isShuttingDown = true;
const closePromises = Array.from(this.queues.values()).map(
async (queue) => {
await queue.pause(true);
await queue.close();
},
);
await Promise.all(closePromises);
console.log("All queues closed");
process.exit(0);
}
async getStats(): Promise<Record<string, QueueStats>> {
const stats: Record<string, QueueStats> = {};
for (const [name, queue] of this.queues) {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
]);
stats[name] = { waiting, active, completed, failed, delayed };
}
return stats;
}
}
interface QueueStats {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
}
const pool = new WorkerPool({
concurrency: os.cpus().length,
limiter: {
max: 100,
duration: 1000,
},
});
pool.registerQueue<EmailJobData>("email", async (job) => {
await sendEmail(job.data);
});
pool.registerQueue<ImageProcessingJobData>("images", async (job) => {
await processImage(job.data);
});
from celery import Celery
from celery.signals import worker_process_init, worker_shutdown
import multiprocessing
app = Celery('tasks')
app.conf.update(
worker_concurrency=multiprocessing.cpu_count(),
worker_prefetch_multiplier=2,
worker_max_tasks_per_child=1000,
worker_max_memory_per_child=200000,
task_acks_late=True,
task_reject_on_worker_lost=True,
)
@worker_process_init.connect
def init_worker(**kwargs):
"""Initialize resources for each worker process."""
db.connect()
cache.warm_up()
@worker_shutdown.connect
def cleanup_worker(**kwargs):
"""Clean up resources on worker shutdown."""
db.close()
cache.flush()
app.conf.task_routes = {
'tasks.send_email': {'queue': 'email'},
'tasks.process_image': {'queue': 'images'},
'tasks.heavy_computation': {'queue': 'compute'},
'tasks.*': {'queue': 'default'},
}
app.conf.worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
app.conf.worker_autoscale_max = 10
app.conf.worker_autoscale_min = 2
Job Priorities and Fairness
interface PriorityJobData {
type: string;
payload: unknown;
priority: "critical" | "high" | "normal" | "low";
}
const priorityMap = {
critical: 1,
high: 5,
normal: 10,
low: 20,
};
async function addPriorityJob(
data: PriorityJobData,
): Promise<Job<PriorityJobData>> {
return queue.add(data, {
priority: priorityMap[data.priority],
delay: data.priority === "critical" ? 0 : undefined,
});
}
class FairScheduler {
private queues: Map<string, Queue.Queue> = new Map();
private weights: Map<string, number> = new Map();
constructor(queueConfigs: Array<{ name: string; weight: number }>) {
for (const config of queueConfigs) {
const queue = new Queue(config.name, process.env.REDIS_URL!);
this.queues.set(config.name, queue);
this.weights.set(config.name, config.weight);
}
}
async process(
handler: (queueName: string, job: Job) => Promise<void>,
): Promise<void> {
const totalWeight = Array.from(this.weights.values()).reduce(
(a, b) => a + b,
0,
);
for (const [name, queue] of this.queues) {
const weight = this.weights.get(name)!;
const concurrency = Math.max(1, Math.floor((weight / totalWeight) * 10));
queue.process(concurrency, async (job) => {
await handler(name, job);
});
}
}
}
const scheduler = new FairScheduler([
{ name: "premium", weight: 5 },
{ name: "standard", weight: 3 },
{ name: "free", weight: 2 },
]);
await scheduler.process(async (queueName, job) => {
console.log(`Processing ${queueName} job:`, job.id);
await processJob(job);
});
Idempotency and Retry Strategies
import Queue, { Job, JobOptions } from "bull";
import { createHash } from "crypto";
function generateIdempotencyKey(data: unknown): string {
const hash = createHash("sha256");
hash.update(JSON.stringify(data));
return hash.digest("hex");
}
class IdempotentProcessor<T> {
private processedKeys: Set<string> = new Set();
private redis: Redis;
constructor(
private queue: Queue.Queue<T>,
redis: Redis,
) {
this.redis = redis;
}
async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
this.queue.process(async (job: Job<T>) => {
const idempotencyKey = job.opts.jobId || generateIdempotencyKey(job.data);
const existing = await this.redis.get(`processed:${idempotencyKey}`);
if (existing) {
console.log(`Job ${job.id} already processed, skipping`);
return JSON.parse(existing);
}
const result = await handler(job);
await this.redis.setex(
`processed:${idempotencyKey}`,
86400,
JSON.stringify(result),
);
return result;
});
}
}
interface RetryStrategy {
type: "exponential" | "linear" | "fixed" | "custom";
baseDelay: number;
maxDelay?: number;
maxRetries: number;
jitter?: boolean;
retryOn?: (error: Error) => boolean;
}
function calculateDelay(strategy: RetryStrategy, attempt: number): number {
let delay: number;
switch (strategy.type) {
case "exponential":
delay = strategy.baseDelay * Math.pow(2, attempt - 1);
break;
case "linear":
delay = strategy.baseDelay * attempt;
break;
case "fixed":
delay = strategy.baseDelay;
break;
default:
delay = strategy.baseDelay;
}
if (strategy.maxDelay) {
delay = Math.min(delay, strategy.maxDelay);
}
if (strategy.jitter) {
const jitterFactor = 0.8 + Math.random() * 0.4;
delay = Math.floor(delay * jitterFactor);
}
return delay;
}
class RetryableQueue<T> {
private mainQueue: Queue.Queue<T>;
private dlq: Queue.Queue<T>;
private strategy: RetryStrategy;
constructor(name: string, strategy: RetryStrategy) {
this.mainQueue = new Queue<T>(name, process.env.REDIS_URL!);
this.dlq = new Queue<T>(`${name}-dlq`, process.env.REDIS_URL!);
this.strategy = strategy;
}
async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
this.mainQueue.process(async (job: Job<T>) => {
const attempts = job.attemptsMade;
try {
return await handler(job);
} catch (error) {
const err = error as Error;
if (this.strategy.retryOn && !this.strategy.retryOn(err)) {
await this.moveToDLQ(job, err);
throw err;
}
if (attempts >= this.strategy.maxRetries) {
await this.moveToDLQ(job, err);
throw err;
}
const delay = calculateDelay(this.strategy, attempts + 1);
throw new Error(`Retry in ${delay}ms: ${err.message}`);
}
});
}
private async moveToDLQ(job: Job<T>, error: Error): Promise<void> {
await this.dlq.add({
originalJob: job.data,
error: error.message,
failedAt: new Date().toISOString(),
attempts: job.attemptsMade,
} as unknown as T);
}
async retryFromDLQ(jobId: string): Promise<void> {
const job = await this.dlq.getJob(jobId);
if (!job) return;
const dlqData = job.data as unknown as { originalJob: T };
await this.mainQueue.add(dlqData.originalJob);
await job.remove();
}
}
Job Monitoring and Dead Jobs
import Queue, { Job, JobCounts, JobStatus } from "bull";
import { EventEmitter } from "events";
interface JobMetrics {
queue: string;
counts: JobCounts;
latency: {
avg: number;
p50: number;
p95: number;
p99: number;
};
throughput: number;
errorRate: number;
}
class JobMonitor extends EventEmitter {
private queues: Queue.Queue[] = [];
private metricsHistory: Map<string, number[]> = new Map();
addQueue(queue: Queue.Queue): void {
this.queues.push(queue);
queue.on("completed", (job, result) => {
this.recordMetric(queue.name, "completed", job);
this.emit("job:completed", { queue: queue.name, job, result });
});
queue.on("failed", (job, err) => {
this.recordMetric(queue.name, "failed", job!);
this.emit("job:failed", { queue: queue.name, job, error: err });
this.checkErrorRate(queue.name);
});
queue.on("stalled", (job) => {
this.emit("job:stalled", { queue: queue.name, jobId: job });
});
}
private recordMetric(queueName: string, type: string, job: Job): void {
const duration = Date.now() - job.timestamp;
const key = `${queueName}:${type}:duration`;
const history = this.metricsHistory.get(key) || [];
history.push(duration);
if (history.length > 1000) {
history.shift();
}
this.metricsHistory.set(key, history);
}
private checkErrorRate(queueName: string): void {
const completed =
this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
const failed =
this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;
if (completed + failed > 10) {
const errorRate = failed / (completed + failed);
if (errorRate > 0.1) {
this.emit("alert:high_error_rate", { queue: queueName, errorRate });
}
}
}
async getMetrics(queueName: string): Promise<JobMetrics> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`Queue ${queueName} not found`);
const counts = await queue.getJobCounts();
const durations =
this.metricsHistory.get(`${queueName}:completed:duration`) || [];
return {
queue: queueName,
counts,
latency: this.calculateLatencyPercentiles(durations),
throughput: this.calculateThroughput(durations),
errorRate: this.calculateErrorRate(queueName),
};
}
private calculateLatencyPercentiles(
durations: number[],
): JobMetrics["latency"] {
if (durations.length === 0) {
return { avg: 0, p50: 0, p95: 0, p99: 0 };
}
const sorted = [...durations].sort((a, b) => a - b);
const avg = sorted.reduce((a, b) => a + b, 0) / sorted.length;
return {
avg: Math.round(avg),
p50: sorted[Math.floor(sorted.length * 0.5)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)],
};
}
private calculateThroughput(durations: number[]): number {
const oneMinuteAgo = Date.now() - 60000;
const recentJobs = durations.filter((_, i) => i > durations.length - 100);
return recentJobs.length;
}
private calculateErrorRate(queueName: string): number {
const completed =
this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
const failed =
this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;
const total = completed + failed;
return total > 0 ? failed / total : 0;
}
async getDeadJobs(queueName: string, limit: number = 100): Promise<Job[]> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`Queue ${queueName} not found`);
return queue.getFailed(0, limit);
}
async retryDeadJob(queueName: string, jobId: string): Promise<void> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`Queue ${queueName} not found`);
const job = await queue.getJob(jobId);
if (!job) throw new Error(`Job ${jobId} not found`);
await job.retry();
}
async retryAllDeadJobs(queueName: string): Promise<number> {
const deadJobs = await this.getDeadJobs(queueName);
let retried = 0;
for (const job of deadJobs) {
try {
await job.retry();
retried++;
} catch (error) {
console.error(`Failed to retry job ${job.id}:`, error);
}
}
return retried;
}
async cleanDeadJobs(
queueName: string,
olderThan: number = 86400000,
): Promise<number> {
const queue = this.queues.find((q) => q.name === queueName);
if (!queue) throw new Error(`Queue ${queueName} not found`);
const cleaned = await queue.clean(olderThan, "failed");
return cleaned.length;
}
}
import express from "express";
function createMonitoringRouter(monitor: JobMonitor): express.Router {
const router = express.Router();
router.get("/queues/:name/metrics", async (req, res) => {
try {
const metrics = await monitor.getMetrics(req.params.name);
res.json(metrics);
} catch (error) {
res.status(404).json({ error: (error as Error).message });
}
});
router.get("/queues/:name/dead", async (req, res) => {
const limit = parseInt(req.query.limit as string) || 100;
const jobs = await monitor.getDeadJobs(req.params.name, limit);
res.json(
jobs.map((j) => ({
id: j.id,
data: j.data,
failedReason: j.failedReason,
attemptsMade: j.attemptsMade,
timestamp: j.timestamp,
})),
);
});
router.post("/queues/:name/dead/:jobId/retry", async (req, res) => {
try {
await monitor.retryDeadJob(req.params.name, req.params.jobId);
res.json({ success: true });
} catch (error) {
res.status(400).json({ error: (error as Error).message });
}
});
router.post("/queues/:name/dead/retry-all", async (req, res) => {
const retried = await monitor.retryAllDeadJobs(req.params.name);
res.json({ retried });
});
router.delete("/queues/:name/dead", async (req, res) => {
const olderThan = parseInt(req.query.olderThan as string) || 86400000;
const cleaned = await monitor.cleanDeadJobs(req.params.name, olderThan);
res.json({ cleaned });
});
return router;
}
Data Pipeline Jobs
ETL scheduling and orchestration:
from celery import chain, group
@app.task
def extract_from_source(source_id: str):
"""Extract data from source system."""
data = fetch_from_api(source_id)
return {'source_id': source_id, 'records': data}
@app.task
def transform_data(extract_result: dict):
"""Transform extracted data."""
records = extract_result['records']
transformed = [normalize_record(r) for r in records]
return {'source_id': extract_result['source_id'], 'records': transformed}
@app.task
def load_to_warehouse(transform_result: dict):
"""Load transformed data to warehouse."""
warehouse.bulk_insert(transform_result['records'])
return {'loaded': len(transform_result['records'])}
def run_etl_pipeline(source_id: str):
pipeline = chain(
extract_from_source.s(source_id),
transform_data.s(),
load_to_warehouse.s(),
)
return pipeline.apply_async()
def run_multi_source_etl(source_ids: list):
pipeline = chain(
group(extract_from_source.s(sid) for sid in source_ids),
group(transform_data.s() for _ in source_ids),
aggregate_and_load.s(),
)
return pipeline.apply_async()
import Queue from "bull";
interface PipelineStage<T, U> {
name: string;
queue: Queue.Queue<T>;
process: (data: T) => Promise<U>;
nextStage?: PipelineStage<U, any>;
}
class DataPipeline {
private stages: Map<string, PipelineStage<any, any>> = new Map();
addStage<T, U>(stage: PipelineStage<T, U>): void {
this.stages.set(stage.name, stage);
stage.queue.process(async (job) => {
const result = await stage.process(job.data);
if (stage.nextStage) {
await stage.nextStage.queue.add(result, {
jobId: `${stage.nextStage.name}-${job.id}`,
});
}
return result;
});
}
async start(initialData: any, startStage: string): Promise<void> {
const stage = this.stages.get(startStage);
if (!stage) throw new Error(`Stage ${startStage} not found`);
await stage.queue.add(initialData);
}
}
const extractQueue = new Queue("extract", redisUrl);
const transformQueue = new Queue("transform", redisUrl);
const loadQueue = new Queue("load", redisUrl);
const pipeline = new DataPipeline();
pipeline.addStage({
name: "extract",
queue: extractQueue,
process: async (sourceId) => fetchFromSource(sourceId),
nextStage: {
name: "transform",
queue: transformQueue,
process: async (data) => transformData(data),
nextStage: {
name: "load",
queue: loadQueue,
process: async (data) => loadToWarehouse(data),
},
},
});
await pipeline.start("source-123", "extract");
ML Training Jobs
Long-running model training with checkpointing:
from celery import Task
import torch
from pathlib import Path
class TrainingTask(Task):
autoretry_for = (RuntimeError,)
max_retries = 3
def __init__(self):
self.checkpoint_dir = Path('/checkpoints')
self.checkpoint_dir.mkdir(exist_ok=True)
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Save checkpoint on failure for recovery."""
model = kwargs.get('model_state')
if model:
checkpoint_path = self.checkpoint_dir / f'{task_id}.pt'
torch.save(model, checkpoint_path)
@app.task(base=TrainingTask, bind=True, time_limit=7200)
def train_model(
self,
model_id: str,
dataset_id: str,
config: dict,
resume_from: str = None
):
"""Train ML model with checkpointing."""
if resume_from:
checkpoint = torch.load(resume_from)
model = checkpoint['model']
optimizer = checkpoint['optimizer']
start_epoch = checkpoint['epoch']
else:
model = create_model(config)
optimizer = create_optimizer(model, config)
start_epoch = 0
dataset = load_dataset(dataset_id)
for epoch in range(start_epoch, config['epochs']):
progress = (epoch / config['epochs']) * 100
self.update_state(
state='PROGRESS',
meta={'epoch': epoch, 'progress': progress}
)
metrics = train_epoch(model, dataset, optimizer)
if epoch % config.get('checkpoint_interval', 10) == 0:
checkpoint_path = self.checkpoint_dir / f'{model_id}_epoch_{epoch}.pt'
torch.save({
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'metrics': metrics,
}, checkpoint_path)
model_path = save_model(model, model_id)
return {'model_path': model_path, 'final_metrics': metrics}
def hyperparameter_search(model_id: str, param_grid: dict):
"""Run parallel hyperparameter search."""
from itertools import product
keys = param_grid.keys()
values = param_grid.values()
combinations = [dict(zip(keys, v)) for v in product(*values)]
jobs = group(
train_model.s(
model_id=f'{model_id}_trial_{i}',
dataset_id='train_dataset',
config=params
)
for i, params in enumerate(combinations)
)
workflow = chain(jobs, select_best_model.s())
return workflow.apply_async()
import Queue, { Job } from "bull";
interface InferenceJob {
modelId: string;
batchId: string;
inputs: Array<{ id: string; data: any }>;
}
interface InferenceResult {
batchId: string;
predictions: Array<{ id: string; prediction: any; confidence: number }>;
}
const inferenceQueue = new Queue<InferenceJob>("ml-inference", redisUrl, {
defaultJobOptions: {
attempts: 2,
backoff: { type: "fixed", delay: 5000 },
},
limiter: {
max: 10,
duration: 1000,
},
});
inferenceQueue.process(4, async (job: Job<InferenceJob>) => {
const { modelId, batchId, inputs } = job.data;
const model = await loadModel(modelId);
const predictions: InferenceResult["predictions"] = [];
for (let i = 0; i < inputs.length; i++) {
const input = inputs[i];
await job.progress((i / inputs.length) * 100);
const prediction = await model.predict(input.data);
predictions.push({
id: input.id,
prediction: prediction.result,
confidence: prediction.confidence,
});
}
return { batchId, predictions };
});
async function runBatchInference(
modelId: string,
dataset: Array<{ id: string; data: any }>,
batchSize: number = 100,
): Promise<string[]> {
const batches: InferenceJob[] = [];
for (let i = 0; i < dataset.length; i += batchSize) {
batches.push({
modelId,
batchId: `batch_${i / batchSize}`,
inputs: dataset.slice(i, i + batchSize),
});
}
const jobs = await inferenceQueue.addBulk(
batches.map((batch, idx) => ({
data: batch,
opts: { jobId: `inference_${modelId}_${idx}` },
})),
);
return jobs.map((j) => j.id!);
}
Job Monitoring and Observability
Metrics, tracing, and alerting:
import { EventEmitter } from "events";
import Queue, { Job } from "bull";
interface ObservabilityConfig {
metricsInterval: number;
alertThresholds: {
errorRate: number;
queueDepth: number;
latencyP99: number;
};
}
class JobObserver extends EventEmitter {
private queues: Map<string, Queue.Queue> = new Map();
private metrics: Map<string, QueueMetrics> = new Map();
private metricsInterval: NodeJS.Timeout | null = null;
constructor(private config: ObservabilityConfig) {
super();
}
observe(queue: Queue.Queue): void {
this.queues.set(queue.name, queue);
this.metrics.set(queue.name, this.emptyMetrics());
queue.on("completed", (job, result) => {
this.recordCompletion(queue.name, job, result);
});
queue.on("failed", (job, err) => {
this.recordFailure(queue.name, job!, err);
});
queue.on("stalled", (job) => {
this.recordStalled(queue.name, job);
});
queue.on("waiting", (jobId) => {
this.recordWaiting(queue.name, jobId);
});
if (!this.metricsInterval) {
this.startMetricsEmission();
}
}
private emptyMetrics(): QueueMetrics {
return {
completed: 0,
failed: 0,
stalled: 0,
waiting: 0,
processing: 0,
latencies: [],
errors: [],
};
}
private recordCompletion(queueName: string, job: Job, result: any): void {
const metrics = this.metrics.get(queueName)!;
metrics.completed++;
const latency = Date.now() - job.timestamp;
metrics.latencies.push(latency);
this.emit("trace", {
queue: queueName,
jobId: job.id,
operation: "job.completed",
duration: latency,
result,
});
}
private recordFailure(queueName: string, job: Job, error: Error): void {
const metrics = this.metrics.get(queueName)!;
metrics.failed++;
metrics.errors.push({
jobId: job.id!,
error: error.message,
timestamp: Date.now(),
});
const errorRate =
metrics.failed / (metrics.completed + metrics.failed || 1);
if (errorRate > this.config.alertThresholds.errorRate) {
this.emit("alert", {
type: "high_error_rate",
queue: queueName,
errorRate,
threshold: this.config.alertThresholds.errorRate,
});
}
this.emit("trace", {
queue: queueName,
jobId: job.id,
operation: "job.failed",
error: error.message,
stack: error.stack,
});
}
private recordStalled(queueName: string, jobId: string): void {
const metrics = this.metrics.get(queueName)!;
metrics.stalled++;
this.emit("alert", {
type: "job_stalled",
queue: queueName,
jobId,
});
}
private recordWaiting(queueName: string, jobId: string): void {
const metrics = this.metrics.get(queueName)!;
metrics.waiting++;
}
private startMetricsEmission(): void {
this.metricsInterval = setInterval(async () => {
for (const [queueName, queue] of this.queues) {
const metrics = this.metrics.get(queueName)!;
const counts = await queue.getJobCounts();
const sorted = [...metrics.latencies].sort((a, b) => a - b);
const p50 = sorted[Math.floor(sorted.length * 0.5)] || 0;
const p95 = sorted[Math.floor(sorted.length * 0.95)] || 0;
const p99 = sorted[Math.floor(sorted.length * 0.99)] || 0;
this.emit("metrics", {
queue: queueName,
timestamp: Date.now(),
counts,
latency: {
p50,
p95,
p99,
},
throughput: metrics.completed,
errorRate: metrics.failed / (metrics.completed + metrics.failed || 1),
});
if (counts.waiting > this.config.alertThresholds.queueDepth) {
this.emit("alert", {
type: "high_queue_depth",
queue: queueName,
depth: counts.waiting,
threshold: this.config.alertThresholds.queueDepth,
});
}
if (p99 > this.config.alertThresholds.latencyP99) {
this.emit("alert", {
type: "high_latency",
queue: queueName,
p99,
threshold: this.config.alertThresholds.latencyP99,
});
}
this.metrics.set(queueName, this.emptyMetrics());
}
}, this.config.metricsInterval);
}
stop(): void {
if (this.metricsInterval) {
clearInterval(this.metricsInterval);
this.metricsInterval = null;
}
}
}
interface QueueMetrics {
completed: number;
failed: number;
stalled: number;
waiting: number;
processing: number;
latencies: number[];
errors: Array<{ jobId: string; error: string; timestamp: number }>;
}
const observer = new JobObserver({
metricsInterval: 10000,
alertThresholds: {
errorRate: 0.05,
queueDepth: 1000,
latencyP99: 5000,
},
});
observer.on("metrics", (metrics) => {
prometheusClient.gauge("queue_depth", metrics.counts.waiting, {
queue: metrics.queue,
});
prometheusClient.histogram("job_latency", metrics.latency.p99, {
queue: metrics.queue,
percentile: "p99",
});
prometheusClient.counter("jobs_completed", metrics.throughput, {
queue: metrics.queue,
});
});
observer.on("trace", (span) => {
tracer.startSpan(span.operation, {
attributes: {
queue: span.queue,
jobId: span.jobId,
duration: span.duration,
},
});
});
observer.on("alert", (alert) => {
if (alert.type === "high_error_rate") {
pagerduty.trigger({
severity: "error",
summary: `High error rate in ${alert.queue}: ${(
alert.errorRate * 100
).toFixed(1)}%`,
});
}
});
Best Practices
-
Idempotency
- Design jobs to be safely re-executed
- Use unique job IDs for deduplication
- Store processed state externally
-
Retry Strategies
- Use exponential backoff with jitter
- Set maximum retry limits
- Distinguish between retryable and non-retryable errors
-
Monitoring and Observability
- Track queue depths, processing latency, and throughput
- Alert on high error rates or growing queues
- Monitor worker health and memory usage
- Use distributed tracing for complex pipelines
- Export metrics to Prometheus, Datadog, or CloudWatch
-
Graceful Shutdown
- Complete in-progress jobs before shutdown
- Use signals (SIGTERM, SIGINT) properly
- Set reasonable timeouts for job completion
-
Resource Management
- Set appropriate concurrency limits
- Use worker pools for CPU-bound tasks
- Implement rate limiting for external APIs
- Configure memory and time limits per job
-
Data Pipeline Design
- Break pipelines into stages with clear boundaries
- Use fan-out/fan-in patterns for parallel processing
- Checkpoint long-running jobs for recovery
- Store intermediate results for debugging
-
ML Training Jobs
- Save checkpoints frequently for crash recovery
- Use separate queues for training vs inference
- Implement resource quotas to prevent starvation
- Track experiment metadata and hyperparameters
Examples
Complete Worker Service
import Queue, { Job } from "bull";
import { Redis } from "ioredis";
interface WorkerConfig {
queues: Array<{
name: string;
concurrency: number;
processor: (job: Job) => Promise<unknown>;
}>;
redis: Redis;
shutdownTimeout: number;
}
class WorkerService {
private queues: Map<string, Queue.Queue> = new Map();
private isShuttingDown = false;
private activeJobs = 0;
constructor(private config: WorkerConfig) {}
async start(): Promise<void> {
for (const queueConfig of this.config.queues) {
const queue = new Queue(queueConfig.name, {
createClient: () => this.config.redis.duplicate(),
});
queue.process(queueConfig.concurrency, async (job) => {
if (this.isShuttingDown) {
throw new Error("Worker shutting down");
}
this.activeJobs++;
try {
return await queueConfig.processor(job);
} finally {
this.activeJobs--;
}
});
this.queues.set(queueConfig.name, queue);
}
process.on("SIGTERM", () => this.shutdown());
process.on("SIGINT", () => this.shutdown());
console.log("Worker service started");
}
private async shutdown(): Promise<void> {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
console.log("Shutting down worker service...");
await Promise.all(
Array.from(this.queues.values()).map((q) => q.pause(true)),
);
const startTime = Date.now();
while (
this.activeJobs > 0 &&
Date.now() - startTime < this.config.shutdownTimeout
) {
await new Promise((r) => setTimeout(r, 100));
}
if (this.activeJobs > 0) {
console.warn(`Forcing shutdown with ${this.activeJobs} active jobs`);
}
await Promise.all(Array.from(this.queues.values()).map((q) => q.close()));
console.log("Worker service stopped");
process.exit(0);
}
}
const worker = new WorkerService({
redis: new Redis(process.env.REDIS_URL),
shutdownTimeout: 30000,
queues: [
{
name: "email",
concurrency: 5,
processor: async (job) => {
await sendEmail(job.data);
},
},
{
name: "images",
concurrency: 2,
processor: async (job) => {
await processImage(job.data);
},
},
],
});
worker.start();