con un clic
celery-tasks
// Create or modify Celery tasks and periodic task configuration. Use when: adding new background tasks, setting up periodic/scheduled tasks, configuring Celery workers, or understanding the Celery app setup.
// Create or modify Celery tasks and periodic task configuration. Use when: adding new background tasks, setting up periodic/scheduled tasks, configuring Celery workers, or understanding the Celery app setup.
Configure or use the aiocache caching layer. Use when: adding cache reads/writes, configuring cache backends, working with TTLs, enabling/disabling caching, or understanding the NoOpCache fallback pattern.
Work with the Docker Compose development environment. Use when: starting or stopping services, inspecting logs, opening a shell in a container, resetting the database, or understanding the service topology.
Create or modify FastAPI routes. Use when: adding new API endpoints, creating Pydantic request/response models, registering routers, designing REST APIs, or following route conventions for this project.
Create or modify Jinja2 templates. Use when: adding new HTML templates, configuring the Jinja2 environment, adding custom filters or globals, rendering templates outside FastAPI, or working with template inheritance.
Complete reference for all make targets in the project. Use when: looking up the right make command for any task — setup, testing, linting, formatting, database, packaging, or cleanup.
Add or modify application configuration settings. Use when: adding new environment variables, settings fields, understanding settings conventions, working with secrets, or configuring optional vs required settings.
| name | celery-tasks |
| description | Create or modify Celery tasks and periodic task configuration. Use when: adding new background tasks, setting up periodic/scheduled tasks, configuring Celery workers, or understanding the Celery app setup. |
context7: If the
mcp_context7tool is available, resolve and load the fullcelerydocumentation before making any changes to the task system:mcp_context7_resolve-library-id: "celery" mcp_context7_get-library-docs: <resolved-id>
The Celery application is defined in full/celery.py. Tasks are exposed via the @celery.task decorator.
Import the celery app instance and decorate functions:
from logging import getLogger
from full.celery import celery
logger = getLogger(__name__)
@celery.task
def send_email(to: str, subject: str, body: str) -> dict[str, str]:
"""Send an email asynchronously."""
logger.info(f"Sending email to {to}: {subject}")
return {"status": "sent", "to": to}
Rules:
logger (never print) for all outputOrganize tasks in separate modules under full/tasks/:
full/
├── celery.py # Celery app configuration
└── tasks/
├── __init__.py
├── email.py # Email-related tasks
└── reports.py # Report generation tasks
Import task modules in full/celery.py to ensure registration:
from full.tasks import email, reports
# Fire and forget
send_email.delay("user@example.com", "Welcome", "Thanks for signing up!")
# With options
send_email.apply_async(
args=["user@example.com", "Welcome", "Body"],
countdown=60, # Execute after 60 seconds
queue='emails', # Route to specific queue
)
# Get result (blocking)
result = send_email.delay("user@example.com", "Hello", "Body")
output = result.get(timeout=10)
Use the on_after_configure signal in full/celery.py (Celery's documented pattern for periodic task registration):
from celery import Celery
from celery.schedules import crontab
@celery.on_after_configure.connect
def setup_periodic_tasks(sender: Celery, **kwargs) -> None:
logger.info("Setting up periodic tasks")
sender.add_periodic_task(300.0, cleanup_old_data.s(), name="Cleanup every 5 min")
sender.add_periodic_task(
crontab(hour=2, minute=0),
generate_report.s(),
name="Daily report at 2 AM"
)
Note: Use
on_after_finalizeinstead ofon_after_configureonly when periodic tasks reference tasks defined in external modules, to ensure the task registry is fully populated before registration.
Use the project's get_session with an async context manager. Wrap async work in a single asyncio.run() call:
import asyncio
from full.services.db import get_session
@celery.task
def process_user_sync(user_id: int) -> dict[str, str]:
return asyncio.run(process_user(user_id))
async def process_user(user_id: int) -> dict[str, str]:
async with get_session() as session:
# ... async DB operations
pass
return {"status": "done"}
Warning:
asyncio.run()creates a new event loop. It will raiseRuntimeErrorif called from within an existing event loop (e.g., when using Celery'sgeventoreventletworker pool). In those cases, useasyncio.get_event_loop().run_until_complete()or configure Celery with theasynpoolworker.
Caches are automatically initialized on on_after_configure. Use them directly in tasks:
import asyncio
from full.services.cache import get_cached, set_cached
@celery.task
def cached_task(key: str) -> str | None:
async def _run() -> str | None:
value = await get_cached(key)
if value is None:
value = compute_expensive_value()
await set_cached(key, value, alias="persistent")
return value
return asyncio.run(_run())
Note: When calling multiple async functions, batch them in a single
async defand wrap with oneasyncio.run()call. Callingasyncio.run()multiple times in the same function will raiseRuntimeError.
from logging import getLogger
logger = getLogger(__name__)
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def fragile_task(self) -> None:
try:
do_risky_thing()
except Exception as exc:
logger.exception("Task failed, retrying")
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Note: Always pass the caught exception to
self.retry(exc=exc)so the original traceback is preserved in logs and raised whenmax_retriesis exceeded.
logger (not print) for all outputfull/celery.py for registration@celery.task decorator used (not standalone @app.task)asyncio.run() wrapper (batch multiple awaits in one async def)bind=True with self.retry(exc=exc)on_after_configure signal (not on_after_finalize)