// Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.
| name | airflow-dag-patterns |
| description | Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs. |
Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.
| Principle | Description |
|---|---|
| Idempotent | Running twice produces same result |
| Atomic | Tasks succeed or fail completely |
| Incremental | Process only new/changed data |
| Observable | Logs, metrics, alerts at every step |
# Linear
task1 >> task2 >> task3
# Fan-out
task1 >> [task2, task3, task4]
# Fan-in
[task1, task2, task3] >> task4
# Complex
task1 >> task2 >> task4
task1 >> task3 >> task4
# dags/example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
}
with DAG(
dag_id='example_etl',
default_args=default_args,
description='Example ETL pipeline',
schedule='0 6 * * *', # Daily at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'example'],
max_active_runs=1,
) as dag:
start = EmptyOperator(task_id='start')
def extract_data(**context):
execution_date = context['ds']
# Extract logic here
return {'records': 1000}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
end = EmptyOperator(task_id='end')
start >> extract >> end
# dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable
@dag(
dag_id='taskflow_etl',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'taskflow'],
)
def taskflow_etl():
"""ETL pipeline using TaskFlow API"""
@task()
def extract(source: str) -> dict:
"""Extract data from source"""
import pandas as pd
df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
return {'data': df.to_dict(), 'rows': len(df)}
@task()
def transform(extracted: dict) -> dict:
"""Transform extracted data"""
import pandas as pd
df = pd.DataFrame(extracted['data'])
df['processed_at'] = datetime.now()
df = df.dropna()
return {'data': df.to_dict(), 'rows': len(df)}
@task()
def load(transformed: dict, target: str):
"""Load data to target"""
import pandas as pd
df = pd.DataFrame(transformed['data'])
df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
return transformed['rows']
@task()
def notify(rows_loaded: int):
"""Send notification"""
print(f'Loaded {rows_loaded} rows')
# Define dependencies with XCom passing
extracted = extract(source='raw_data')
transformed = transform(extracted)
loaded = load(transformed, target='processed_data')
notify(loaded)
# Instantiate the DAG
taskflow_etl()
# dags/dynamic_dag_factory.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import json
# Configuration for multiple similar pipelines
PIPELINE_CONFIGS = [
{'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
{'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
{'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
]
def create_dag(config: dict) -> DAG:
"""Factory function to create DAGs from config"""
dag_id = f"etl_{config['name']}"
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule=config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'dynamic', config['name']],
)
with dag:
def extract_fn(source, **context):
print(f"Extracting from {source} for {context['ds']}")
def transform_fn(**context):
print(f"Transforming data for {context['ds']}")
def load_fn(table_name, **context):
print(f"Loading to {table_name} for {context['ds']}")
extract = PythonOperator(
task_id='extract',
python_callable=extract_fn,
op_kwargs={'source': config['source']},
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_fn,
)
load = PythonOperator(
task_id='load',
python_callable=load_fn,
op_kwargs={'table_name': config['name']},
)
extract >> transform >> load
return dag
# Generate DAGs
for config in PIPELINE_CONFIGS:
globals()[f"dag_{config['name']}"] = create_dag(config)
# dags/branching_example.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
@dag(
dag_id='branching_pipeline',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def branching_pipeline():
@task()
def check_data_quality() -> dict:
"""Check data quality and return metrics"""
quality_score = 0.95 # Simulated
return {'score': quality_score, 'rows': 10000}
def choose_branch(**context) -> str:
"""Determine which branch to execute"""
ti = context['ti']
metrics = ti.xcom_pull(task_ids='check_data_quality')
if metrics['score'] >= 0.9:
return 'high_quality_path'
elif metrics['score'] >= 0.7:
return 'medium_quality_path'
else:
return 'low_quality_path'
quality_check = check_data_quality()
branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)
high_quality = EmptyOperator(task_id='high_quality_path')
medium_quality = EmptyOperator(task_id='medium_quality_path')
low_quality = EmptyOperator(task_id='low_quality_path')
# Join point - runs after any branch completes
join = EmptyOperator(
task_id='join',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
branching_pipeline()
# dags/sensor_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
with DAG(
dag_id='sensor_example',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Wait for file on S3
wait_for_file = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='data-lake',
bucket_key='raw/{{ ds }}/data.parquet',
aws_conn_id='aws_default',
timeout=60 * 60 * 2, # 2 hours
poke_interval=60 * 5, # Check every 5 minutes
mode='reschedule', # Free up worker slot while waiting
)
# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_etl',
external_task_id='final_task',
execution_date_fn=lambda dt: dt, # Same execution date
timeout=60 * 60 * 3,
mode='reschedule',
)
# Custom sensor using @task.sensor decorator
@task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_api() -> PokeReturnValue:
"""Custom sensor for API availability"""
import requests
response = requests.get('https://api.example.com/health')
is_done = response.status_code == 200
return PokeReturnValue(is_done=is_done, xcom_value=response.json())
api_ready = wait_for_api()
def process_data(**context):
api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
print(f"API returned: {api_result}")
process = PythonOperator(
task_id='process',
python_callable=process_data,
)
[wait_for_file, wait_for_upstream, api_ready] >> process
# dags/error_handling.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
def task_failure_callback(context):
"""Callback on task failure"""
task_instance = context['task_instance']
exception = context.get('exception')
# Send to Slack/PagerDuty/etc
message = f"""
Task Failed!
DAG: {task_instance.dag_id}
Task: {task_instance.task_id}
Execution Date: {context['ds']}
Error: {exception}
Log URL: {task_instance.log_url}
"""
# send_slack_alert(message)
print(message)
def dag_failure_callback(context):
"""Callback on DAG failure"""
# Aggregate failures, send summary
pass
with DAG(
dag_id='error_handling_example',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
on_failure_callback=dag_failure_callback,
default_args={
'on_failure_callback': task_failure_callback,
'retries': 3,
'retry_delay': timedelta(minutes=5),
},
) as dag:
def might_fail(**context):
import random
if random.random() < 0.3:
raise ValueError("Random failure!")
return "Success"
risky_task = PythonOperator(
task_id='risky_task',
python_callable=might_fail,
)
def cleanup(**context):
"""Cleanup runs regardless of upstream failures"""
print("Cleaning up...")
cleanup_task = PythonOperator(
task_id='cleanup',
python_callable=cleanup,
trigger_rule=TriggerRule.ALL_DONE, # Run even if upstream fails
)
def notify_success(**context):
"""Only runs if all upstream succeeded"""
print("All tasks succeeded!")
success_notification = PythonOperator(
task_id='notify_success',
python_callable=notify_success,
trigger_rule=TriggerRule.ALL_SUCCESS,
)
risky_task >> [cleanup_task, success_notification]
# tests/test_dags.py
import pytest
from datetime import datetime
from airflow.models import DagBag
@pytest.fixture
def dagbag():
return DagBag(dag_folder='dags/', include_examples=False)
def test_dag_loaded(dagbag):
"""Test that all DAGs load without errors"""
assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"
def test_dag_structure(dagbag):
"""Test specific DAG structure"""
dag = dagbag.get_dag('example_etl')
assert dag is not None
assert len(dag.tasks) == 3
assert dag.schedule_interval == '0 6 * * *'
def test_task_dependencies(dagbag):
"""Test task dependencies are correct"""
dag = dagbag.get_dag('example_etl')
extract_task = dag.get_task('extract')
assert 'start' in [t.task_id for t in extract_task.upstream_list]
assert 'end' in [t.task_id for t in extract_task.downstream_list]
def test_dag_integrity(dagbag):
"""Test DAG has no cycles and is valid"""
for dag_id, dag in dagbag.dags.items():
assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"
# Test individual task logic
def test_extract_function():
"""Unit test for extract function"""
from dags.example_dag import extract_data
result = extract_data(ds='2024-01-01')
assert 'records' in result
assert isinstance(result['records'], int)
airflow/
├── dags/
│ ├── __init__.py
│ ├── common/
│ │ ├── __init__.py
│ │ ├── operators.py # Custom operators
│ │ ├── sensors.py # Custom sensors
│ │ └── callbacks.py # Alert callbacks
│ ├── etl/
│ │ ├── customers.py
│ │ └── orders.py
│ └── ml/
│ └── training.py
├── plugins/
│ └── custom_plugin.py
├── tests/
│ ├── __init__.py
│ ├── test_dags.py
│ └── test_operators.py
├── docker-compose.yml
└── requirements.txt
mode='reschedule' - For sensors, free up workersdepends_on_past=True - Creates bottlenecks{{ ds }} macros