mit einem Klick
data-engineering-pipeline-designer
// Design data pipelines with quality checks, orchestration, and governance using modern data stack patterns for robust ELT/ETL workflows.
// Design data pipelines with quality checks, orchestration, and governance using modern data stack patterns for robust ELT/ETL workflows.
Analyzes and optimizes frontend performance using Core Web Vitals, bundle analysis, lazy loading, image optimization, and caching strategies
Design RESTful APIs with OpenAPI 3.1/3.2, resource modeling, HTTP semantics, versioning, pagination, HATEOAS, and OWASP API Security.
Validate WCAG 2.2 compliance (A/AA/AAA) with ARIA, color contrast, keyboard navigation, screen readers, and automated testing via axe-core/Pa11y.
Design Kafka architectures with exactly-once semantics, Kafka Streams, ksqlDB, Schema Registry (Avro/Protobuf), performance tuning, and KRaft.
Design RabbitMQ architectures with exchanges, quorum queues, routing patterns, clustering, dead letter exchanges, and AMQP best practices.
Configure Prometheus with alerting, recording rules, service discovery (K8s, Consul, EC2), federation, PromQL optimization, and Alertmanager.
| name | Data Engineering Pipeline Designer |
| slug | data-pipeline-designer |
| description | Design data pipelines with quality checks, orchestration, and governance using modern data stack patterns for robust ELT/ETL workflows. |
| capabilities | ["Design DAGs for batch and streaming pipelines","Embed data quality checks with Great Expectations","Configure Airflow orchestration best practices","Model transformations with dbt patterns","Implement Kafka streaming architectures","Define data lineage and governance controls"] |
| inputs | [{"pipeline_type":"batch | streaming | hybrid"},{"source_systems":"array of data sources (databases, APIs, files, streams)"},{"transformation_requirements":"business logic, aggregations, joins"},{"quality_requirements":"data validation rules, SLAs, monitoring needs"},{"orchestration_platform":"Airflow | Prefect | Dagster | custom"},{"target_systems":"data warehouse, lake, lakehouse destinations"}] |
| outputs | [{"pipeline_architecture":"JSON schema with components and flow"},{"dag_template":"orchestration code (Airflow DAG, dbt project)"},{"quality_checks":"Great Expectations suite configuration"},{"monitoring_config":"alerts, SLAs, data lineage tracking"},{"implementation_guide":"step-by-step deployment instructions"}] |
| keywords | ["data-engineering","airflow","dbt","great-expectations","kafka","data-quality","orchestration","ETL","ELT","data-pipeline"] |
| version | 1.0.0 |
| owner | cognitive-toolworks |
| license | MIT |
| security | No secrets or PII in examples; use environment variables for credentials |
| links | ["https://airflow.apache.org/docs/","https://docs.getdbt.com/","https://greatexpectations.io/","https://kafka.apache.org/documentation/"] |
Trigger this skill when:
Do NOT use for:
Time normalization:
NOW_ET = 2025-10-25T21:30:36-04:00 (NIST/time.gov, America/New_York)Input validation:
pipeline_type must be one of: batch, streaming, hybridsource_systems must contain at least one valid sourcetransformation_requirements must specify business logic or be empty for raw ingestionquality_requirements must define at least one validation rule or SLAorchestration_platform must be specified (default: Airflow if omitted)target_systems must contain at least one destinationAbort conditions:
Use when: 80% of cases; standard batch pipeline with known patterns
Steps:
Analyze inputs and classify pipeline pattern:
Select orchestration approach:
Generate pipeline architecture JSON:
{
"pipeline_id": "<slug>",
"type": "batch|streaming|hybrid",
"orchestration": "airflow|kafka",
"layers": {
"ingestion": {"sources": [], "method": "full|incremental"},
"transformation": {"tool": "dbt", "models": []},
"quality": {"framework": "great_expectations", "checkpoints": []},
"storage": {"targets": [], "format": "parquet|delta"}
},
"schedule": "cron|event-driven"
}
Output quick-start template:
Define monitoring:
Token budget: T1 ≤ 2000 tokens
Use when: Production deployment, complex transformations, strict SLAs
Prerequisites: T1 completed OR inputs indicate production requirements
Steps:
Deep-dive on data quality (accessed 2025-10-25T21:30:36-04:00: https://greatexpectations.io/):
Optimize Airflow DAG design (accessed 2025-10-25T21:30:36-04:00: https://medium.com/@datasmiles/mastering-apache-airflow-myessential-best-practices-for-robust-data-orchestration-095460505843):
Implement dbt best practices (accessed 2025-10-25T21:30:36-04:00: https://www.getdbt.com/blog/data-transformation-best-practices):
Configure streaming (if applicable):
Data lineage and governance:
Monitoring and alerting:
Token budget: T2 ≤ 6000 tokens
Use when: Handling PB-scale data, multi-region, complex event-driven patterns
Note: This skill is scoped to T2. For T3 scenarios:
Not implemented in v1.0.0.
When to choose batch vs streaming:
When to use incremental vs full refresh:
When to fail vs warn on data quality issues:
Orchestration platform selection:
Abort conditions:
Required fields:
{
"pipeline_architecture": {
"pipeline_id": "string (slug format)",
"type": "batch|streaming|hybrid",
"orchestration": {
"platform": "airflow|prefect|dagster|kafka",
"schedule": "cron expression | event-driven",
"parallelism": "integer (max concurrent tasks)"
},
"layers": {
"ingestion": {
"sources": ["array of source configs"],
"method": "full|incremental",
"connector": "native|fivetran|airbyte|custom"
},
"transformation": {
"tool": "dbt|spark|custom",
"models": ["array of model names"],
"materialization": "view|table|incremental"
},
"quality": {
"framework": "great_expectations|dbt_tests|custom",
"checkpoints": ["array of checkpoint configs"],
"action_on_failure": "block|warn|quarantine"
},
"storage": {
"targets": ["array of target configs"],
"format": "parquet|delta|iceberg|avro"
}
},
"monitoring": {
"slas": ["array of SLA definitions"],
"alerts": ["array of alert configs"],
"lineage": "openlineage|datahub|custom"
}
},
"dag_template": "string (executable code or path to resource)",
"quality_checks": "string (Great Expectations suite YAML or dbt test SQL)",
"monitoring_config": "string (alert rules, dashboard JSON)",
"implementation_guide": "array of step-by-step instructions"
}
Optional fields:
cost_estimate: projected monthly cost (warehouse + orchestration + storage)performance_benchmarks: expected throughput, latency targetsrollback_plan: how to revert if pipeline fails in productionValidation:
pipeline_id must be unique, slug format (lowercase, hyphens)schedule must be valid cron OR event trigger definitionsources and targets must have valid connection info (no credentials in output)models must exist in transformation layerExample 1: Batch ELT Pipeline (E-commerce Orders)
# Input
pipeline_type: batch
source_systems: [{type: postgres, name: orders_db, tables: [orders, customers]}]
transformation_requirements: [Join orders+customers, Calculate daily revenue]
quality_requirements: [order_id unique, order_total > 0]
orchestration_platform: airflow
target_systems: [{type: snowflake, schema: analytics}]
schedule: 0 2 * * *
# Output (abbreviated)
pipeline_architecture:
pipeline_id: ecommerce-orders-elt
type: batch
orchestration: {platform: airflow, schedule: "0 2 * * *"}
layers:
ingestion:
sources: [orders_db.orders, orders_db.customers]
method: incremental
transformation:
tool: dbt
models: [stg_orders, int_order_metrics, fct_daily_revenue]
quality:
framework: great_expectations
checkpoints: [staging_check, marts_check]
Token budgets (enforced):
Safety checks:
Auditability:
implementation_guideDeterminism:
Official documentation (accessed 2025-10-25T21:30:36-04:00):
Best practices guides:
Templates and examples:
/skills/data-pipeline-designer/resources/airflow-dag-template.py: Production-ready DAG with TaskGroups and SLAsdbt-project-structure.yml: Layered dbt project (staging → marts)great-expectations-suite.yml: Common data quality checkskafka-streaming-config.json: Schema registry + consumer group setupRelated skills:
database-optimization-analyzer: For warehouse query tuning and indexingdevops-pipeline-architect: For CI/CD of pipeline codecloud-native-deployment-orchestrator: For Kubernetes-based Airflow deployments