con un clic
apscheduler
Advanced Python Scheduler - Task scheduling and job queue system
Menú
Advanced Python Scheduler - Task scheduling and job queue system
Database migration management for SQLAlchemy projects using Alembic
Build async APIs with FastAPI, including endpoints, dependency injection, validation, and testing. Use when creating REST APIs, web backends, or microservices.
Data validation and settings management using Python type annotations with Pydantic v2
Structured logging for Python applications with context support and powerful processors
ASGI server for Python web applications - Fast, production-ready server for async frameworks
Promise-based HTTP client for making requests from browser and Node.js
| name | apscheduler |
| description | Advanced Python Scheduler - Task scheduling and job queue system |
| when_to_use | Background task execution, periodic jobs, cron scheduling, distributed task processing |
| tags | ["scheduling","background-jobs","cron","async","task-queue"] |
APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print(f"Tick: {datetime.now()}")
# Create and start scheduler with memory datastore
with Scheduler() as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
scheduler.run_until_stopped()
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
def cleanup_task():
print("Running cleanup task...")
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncScheduler()
async with scheduler:
await scheduler.add_schedule(
cleanup_task,
IntervalTrigger(hours=1),
id="cleanup"
)
await scheduler.start_in_background()
yield
app = FastAPI(lifespan=lifespan)
In-memory scheduler (development):
from apscheduler import AsyncScheduler
async def main():
async with AsyncScheduler() as scheduler:
# Jobs lost on restart
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
Persistent scheduler (production):
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
# Jobs survive restarts
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
Distributed scheduler:
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
# Scheduler node - creates jobs from schedules
async def scheduler_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.scheduler
) as scheduler:
await scheduler.add_schedule(task, trigger)
await scheduler.run_until_stopped()
# Worker node - executes jobs only
async def worker_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.worker
) as scheduler:
await scheduler.run_until_stopped()
Simple function jobs:
def send_daily_report():
generate_report()
email_report("admin@example.com")
scheduler.add_schedule(
send_daily_report,
CronTrigger(hour=9, minute=0) # 9 AM daily
)
Jobs with arguments:
def process_data(source: str, destination: str, batch_size: int):
# Data processing logic
pass
scheduler.add_schedule(
process_data,
IntervalTrigger(hours=1),
kwargs={
'source': 's3://incoming',
'destination': 's3://processed',
'batch_size': 1000
}
)
Async jobs:
async def fetch_external_api():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
data = await resp.json()
await save_to_database(data)
scheduler.add_schedule(
fetch_external_api,
IntervalTrigger(minutes=5)
)
Interval trigger:
from apscheduler.triggers.interval import IntervalTrigger
# Every 30 seconds
IntervalTrigger(seconds=30)
# Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)
# Every 3 days
IntervalTrigger(days=3)
Cron trigger:
from apscheduler.triggers.cron import CronTrigger
# 9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')
# Every 15 minutes
CronTrigger(minute='*/15')
# Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)
# Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5') # 9 AM weekdays
Date trigger (one-time):
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger
# 5 minutes from now
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)
# Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
Calendar interval:
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
# First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)
# Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
SQLite:
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)
PostgreSQL:
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
Redis (event broker):
from apscheduler.eventbrokers.redis import RedisEventBroker
event_broker = RedisEventBroker.from_url("redis://localhost:6379")
Get job results:
async def main():
async with AsyncScheduler() as scheduler:
await scheduler.start_in_background()
# Add job with result retention
job_id = await scheduler.add_job(
calculate_result,
args=(10, 20),
result_expiration_time=timedelta(hours=1)
)
# Wait for result
result = await scheduler.get_job_result(job_id, wait=True)
print(f"Result: {result.return_value}")
Schedule management:
# Pause schedule
await scheduler.pause_schedule("my_schedule")
# Resume schedule
await scheduler.unpause_schedule("my_schedule")
# Remove schedule
await scheduler.remove_schedule("my_schedule")
# Get schedule info
schedule = await scheduler.get_schedule("my_schedule")
print(f"Next run: {schedule.next_fire_time}")
Event handling:
from apscheduler import JobAdded, JobReleased
def on_job_completed(event: JobReleased):
if event.outcome == Outcome.success:
print(f"Job {event.job_id} completed successfully")
else:
print(f"Job {event.job_id} failed: {event.exception}")
scheduler.subscribe(on_job_completed, JobReleased)
Task defaults:
from apscheduler import TaskDefaults
task_defaults = TaskDefaults(
job_executor='threadpool',
max_running_jobs=3,
misfire_grace_time=timedelta(minutes=5)
)
scheduler = AsyncScheduler(task_defaults=task_defaults)
Job execution options:
# Configure task behavior
await scheduler.configure_task(
my_function,
job_executor='processpool',
max_running_jobs=5,
misfire_grace_time=timedelta(minutes=10)
)
# Override per schedule
await scheduler.add_schedule(
my_function,
trigger,
job_executor='threadpool', # Override default
coalesce=CoalescePolicy.latest
)
# Core package
pip install apscheduler
# Database backends
pip install "apscheduler[postgresql]" # PostgreSQL
pip install "apscheduler[mongodb]" # MongoDB
pip install "apscheduler[sqlite]" # SQLite
# Event brokers
pip install "apscheduler[redis]" # Redis
pip install "apscheduler[mqtt]" # MQTT
Dependencies by use case:
apschedulerasyncpg, sqlalchemyredismotoraiosqlite