| name | airflow |
| description | Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead. |
Airflow Operations
Use af commands to query, manage, and troubleshoot Airflow workflows.
Astro CLI
The Astro CLI is the recommended way to run Airflow locally and deploy to production. It provides a containerized Airflow environment that works out of the box:
astro dev init
astro dev start
astro dev parse
astro dev pytest
astro deploy
astro deploy --dags
For more details:
- New project? See the setting-up-astro-project skill
- Local environment? See the managing-astro-local-env skill
- Deploying? See the deploying-airflow skill
Running the CLI
These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.
Instance Configuration
Manage multiple Airflow instances with persistent configuration:
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin
af instance list
af instance use prod
af instance current
af instance delete old-instance
af instance discover --dry-run
af instance discover
af instance discover astro
af instance discover astro --all-workspaces
af instance discover local
af instance discover local --scan
af instance show prod
AIRFLOW_API_URL=https://staging.example.com AIRFLOW_AUTH_TOKEN=$STG af dags list
af instance use staging
Config layout (mirrors git config system/global/local):
| Scope | File | Committed? |
|---|
| Global | ~/.astro/config.yaml | n/a (per-user) |
| Project shared | <root>/.astro/config.yaml | yes |
| Project local | <root>/.astro/config.local.yaml | no (gitignored) |
<root> is found by walking up from cwd looking for .astro/. Default write routing inside a project: add/discover → project-shared, use → project-local. Override with --global / --project / --local. Set AF_CONFIG=<path> to bypass layering and use a single file.
Migrate from the legacy ~/.af/config.yaml with af migrate (idempotent; renames the old file to .bak).
Tokens in config can reference environment variables using ${VAR} syntax:
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
Or use environment variables directly (no config file needed):
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin
Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
Quick Reference
| Command | Description |
|---|
af health | System health check |
af dags list | List all DAGs |
af dags get <dag_id> | Get DAG details |
af dags explore <dag_id> | Full DAG investigation |
af dags source <dag_id> | Get DAG source code |
af dags pause <dag_id> | Pause DAG scheduling |
af dags unpause <dag_id> | Resume DAG scheduling |
af dags errors | List import errors |
af dags warnings | List DAG warnings |
af dags stats | DAG run statistics |
af runs list | List DAG runs |
af runs get <dag_id> <run_id> | Get run details |
af runs trigger <dag_id> | Trigger a DAG run |
af runs trigger-wait <dag_id> | Trigger and wait for completion |
af runs delete <dag_id> <run_id> | Permanently delete a DAG run |
af runs clear <dag_id> <run_id> | Clear a run for re-execution |
af runs diagnose <dag_id> <run_id> | Diagnose failed run |
af tasks list <dag_id> | List tasks in DAG |
af tasks get <dag_id> <task_id> | Get task definition |
af tasks instance <dag_id> <run_id> <task_id> | Get task instance |
af tasks logs <dag_id> <run_id> <task_id> | Get task logs |
af config version | Airflow version |
af config show | Full configuration |
af config connections | List connections |
af config variables | List variables |
af config variable <key> | Get specific variable |
af config pools | List pools |
af config pool <name> | Get pool details |
af config plugins | List plugins |
af config providers | List providers |
af config assets | List assets/datasets |
af api <endpoint> | Direct REST API access |
af api ls | List available API endpoints |
af api ls --filter X | List endpoints matching pattern |
af registry providers | List providers in the Airflow Registry |
af registry modules <provider> | List operators/hooks/sensors/transfers in a provider |
af registry parameters <provider> | Constructor signatures (name, type, default, required) for a provider's classes |
af registry connections <provider> | Connection types a provider exposes |
User Intent Patterns
Getting Started
- "How do I run Airflow locally?" / "Set up Airflow" -> use the managing-astro-local-env skill (uses Astro CLI)
- "Create a new Airflow project" / "Initialize project" -> use the setting-up-astro-project skill (uses Astro CLI)
- "How do I install Airflow?" / "Get started with Airflow" -> use the setting-up-astro-project skill
DAG Operations
- "What DAGs exist?" / "List all DAGs" ->
af dags list
- "Tell me about DAG X" / "What is DAG Y?" ->
af dags explore <dag_id>
- "What's the schedule for DAG X?" ->
af dags get <dag_id>
- "Show me the code for DAG X" ->
af dags source <dag_id>
- "Stop DAG X" / "Pause this workflow" ->
af dags pause <dag_id>
- "Resume DAG X" ->
af dags unpause <dag_id>
- "Are there any DAG errors?" ->
af dags errors
- "Create a new DAG" / "Write a pipeline" -> use the authoring-dags skill
Run Operations
- "What runs have executed?" ->
af runs list
- "Run DAG X" / "Trigger the pipeline" ->
af runs trigger <dag_id>
- "Run DAG X and wait" ->
af runs trigger-wait <dag_id>
- "Why did this run fail?" ->
af runs diagnose <dag_id> <run_id>
- "Delete this run" / "Remove stuck run" ->
af runs delete <dag_id> <run_id>
- "Clear this run" / "Retry this run" / "Re-run this" ->
af runs clear <dag_id> <run_id>
- "Test this DAG and fix if it fails" -> use the testing-dags skill
Task Operations
- "What tasks are in DAG X?" ->
af tasks list <dag_id>
- "Get task logs" / "Why did task fail?" ->
af tasks logs <dag_id> <run_id> <task_id>
- "Full root cause analysis" / "Diagnose and fix" -> use the debugging-dags skill
Data Operations
- "Is the data fresh?" / "When was this table last updated?" -> use the checking-freshness skill
- "Where does this data come from?" -> use the tracing-upstream-lineage skill
- "What depends on this table?" / "What breaks if I change this?" -> use the tracing-downstream-lineage skill
Deployment Operations
- "Deploy my DAGs" / "Push to production" -> use the deploying-airflow skill
- "Set up CI/CD" / "Automate deploys" -> use the deploying-airflow skill
- "Deploy to Kubernetes" / "Set up Helm" -> use the deploying-airflow skill
- "astro deploy" / "DAG-only deploy" -> use the deploying-airflow skill
System Operations
- "What version of Airflow?" ->
af config version
- "What connections exist?" ->
af config connections
- "Are pools full?" ->
af config pools
- "Is Airflow healthy?" ->
af health
API Exploration
- "What API endpoints are available?" ->
af api ls
- "Find variable endpoints" ->
af api ls --filter variable
- "Access XCom values" / "Get XCom" ->
af api xcom-entries -F dag_id=X -F task_id=Y
- "Get event logs" / "Audit trail" ->
af api event-logs -F dag_id=X
- "Create connection via API" ->
af api connections -X POST --body '{...}'
- "Create variable via API" ->
af api variables -X POST -F key=name -f value=val
Registry Discovery
- "What operators does provider X have?" ->
af registry modules <provider>
- "What are the constructor params for operator Y?" ->
af registry parameters <provider>
- "What providers exist?" / "Is there a provider for Z?" ->
af registry providers
- "What connection types does provider X expose?" ->
af registry connections <provider>
- "Writing a DAG with a specific operator" -> use registry to verify current signature before copying examples
Common Workflows
Validate DAGs Before Deploying
If you're using the Astro CLI, you can validate DAGs without a running Airflow instance:
astro dev parse
astro dev pytest
Otherwise, validate against a running instance:
af dags errors
af dags warnings
Discover Operator Signatures Before Writing Code
The Airflow Registry at airflow.apache.org/registry is the authoritative source for provider classes and their current constructor signatures. Prefer it over memory or stale documentation when authoring DAGs — the registry reflects the live provider release.
af registry providers | jq '.providers[] | {id, name, version}'
af registry modules standard \
| jq '.modules[] | {name, type, import_path, docs_url}'
af registry parameters standard \
| jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'
af registry modules standard \
| jq '.modules[] | select(.import_path | test("hitl"))'
Results are cached locally: 1 hour for the latest version, 30 days for pinned versions (which are immutable). Add --version X.Y.Z to any modules / parameters / connections call to target a specific release.
Investigate a Failed Run
af runs list --dag-id my_dag
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
af runs clear my_dag manual__2024-01-15T10:00:00+00:00
Morning Health Check
af health
af dags errors
af config pools
Understand a DAG
af dags explore my_dag
Check Why DAG Isn't Running
af dags get my_dag
af dags errors
af runs list --dag-id my_dag
Trigger and Monitor
af runs trigger-wait my_dag --timeout 1800
af runs trigger my_dag
af runs get my_dag <run_id>
Output Format
All commands output JSON (except instance commands which use human-readable tables):
af dags list
Use jq for filtering:
af runs list | jq '.dag_runs[] | select(.state == "failed")'
af dags list | jq '.dags[].dag_id'
af dags list | jq '[.dags[] | select(.is_paused == true)]'
Task Logs Options
af tasks logs my_dag run_id task_id --try 2
af tasks logs my_dag run_id task_id --map-index 5
Direct API Access with af api
Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).
af api ls
af api ls --filter variable
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE
Field syntax: -F key=value auto-converts types, -f key=value keeps as string.
Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.
Related Skills
| Skill | Use when... |
|---|
| authoring-dags | Creating or editing DAG files with best practices |
| testing-dags | Iterative test -> debug -> fix -> retest cycles |
| debugging-dags | Deep root cause analysis and failure diagnosis |
| checking-freshness | Checking if data is up to date or stale |
| tracing-upstream-lineage | Finding where data comes from |
| tracing-downstream-lineage | Impact analysis -- what breaks if something changes |
| deploying-airflow | Deploying DAGs to production (Astro, Docker Compose, Kubernetes) |
| migrating-airflow-2-to-3 | Upgrading DAGs from Airflow 2.x to 3.x |
| managing-astro-local-env | Starting, stopping, or troubleshooting local Airflow |
| setting-up-astro-project | Initializing a new Astro/Airflow project |