| name | python-background-jobs |
| description | Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles. |
Python Background Jobs & Task Queues
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
When to Use This Skill
- Processing tasks that take longer than a few seconds
- Sending emails, notifications, or webhooks
- Generating reports or exporting data
- Processing uploads or media transformations
- Integrating with unreliable external services
- Building event-driven architectures
Core Concepts
1. Task Queue Pattern
API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.
2. Idempotency
Tasks may be retried on failure. Design for safe re-execution.
3. Job State Machine
Jobs transition through states: pending → running → succeeded/failed.
4. At-Least-Once Delivery
Most queues guarantee at-least-once delivery. Your code must handle duplicates.
Quick Start
This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
Fundamental Patterns
Pattern 1: Return Job ID Immediately
For operations exceeding a few seconds, return a job ID and process asynchronously.
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None
async def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)
Pattern 2: Celery Task Configuration
Configure Celery tasks with proper retry and timeout settings.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
app.conf.update(
task_time_limit=3600,
task_soft_time_limit=3000,
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1,
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
return {"status": "declined", "reason": str(e)}
except TransientError as e:
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
Pattern 3: Make Tasks Idempotent
Workers may retry on crash or timeout. Design for safe re-execution.
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
if order.status == OrderStatus.PROCESSING:
pass
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}",
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
Idempotency Strategies:
- Check-before-write: Verify state before action
- Idempotency keys: Use unique tokens with external services
- Upsert patterns:
INSERT ... ON CONFLICT UPDATE
- Deduplication window: Track processed IDs for N hours
Pattern 4: Job State Management
Persist job state transitions for visibility and debugging.
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)
Detailed worked examples and patterns
Detailed sections (starting with ## Advanced Patterns) live in references/details.md. Read that file when the navigation summary above is insufficient.
Best Practices Summary
- Return immediately - Don't block requests for long operations
- Persist job state - Enable status polling and debugging
- Make tasks idempotent - Safe to retry on any failure
- Use idempotency keys - For external service calls
- Set timeouts - Both soft and hard limits
- Implement DLQ - Capture permanently failed tasks
- Log transitions - Track job state changes
- Retry appropriately - Exponential backoff for transient errors
- Don't retry permanent failures - Validation errors, invalid credentials
- Monitor queue depth - Alert on backlog growth