بنقرة واحدة
data-engineering
// 数据工程。Airflow、Dagster、Kafka Streams、Flink、dbt、数据管道、流处理、数据质量。当用户提到数据管道、ETL、流处理、数据质量时路由到此。
// 数据工程。Airflow、Dagster、Kafka Streams、Flink、dbt、数据管道、流处理、数据质量。当用户提到数据管道、ETL、流处理、数据质量时路由到此。
| name | data-engineering |
| description | 数据工程。Airflow、Dagster、Kafka Streams、Flink、dbt、数据管道、流处理、数据质量。当用户提到数据管道、ETL、流处理、数据质量时路由到此。 |
| license | MIT |
| user-invocable | false |
| disable-model-invocation | false |
数据工程域涵盖数据管道编排、流式处理、数据质量保障三大核心领域。
数据管道层 流处理层 质量保障层
├── Airflow (调度编排) ├── Kafka Streams ├── Great Expectations
├── Dagster (资产管理) ├── Flink ├── dbt
└── Prefect (现代工作流) └── Spark Streaming └── Soda Core
| 特性 | Airflow | Dagster | Prefect |
|---|---|---|---|
| 核心模型 | DAG + Task | Asset + Op | Flow + Task |
| 学习曲线 | 陡峭 | 中等 | 平缓 |
| 资产管理 | 无 | 原生支持 | 无 |
| 动态任务 | 支持 | 支持 | 支持 |
| 本地开发 | 复杂 | 简单 | 简单 |
| 社区生态 | 最大 | 成长中 | 成长中 |
with DAG(dag_id, schedule, default_args) as dag@task 装饰器,自动 XCom 传递@task + .expand() 实现 dynamic task mappingretries=3, retry_delay=timedelta(minutes=5), retry_exponential_backoff=Trueon_failure_callback 发送告警sla=timedelta(hours=2) + sla_miss_callback@asset(group_name, deps) 声明数据资产ConfigurableResource 管理外部连接define_asset_job(selection=AssetSelection.groups(...))ScheduleDefinition(job, cron_schedule)@sensor(job) 监听外部事件触发DailyPartitionsDefinition 按日分区@asset_check 验证数据新鲜度/质量@flow + @task(retries=3, cache_key_fn=task_input_hash)ConcurrentTaskRunner + task.map(items)Deployment.build_from_flow(schedule=CronSchedule(...))Secret / JSON 管理配置和密钥0 2 * * * 日批 / */15 * * * * 实时)WHERE updated_at > last_run| 特性 | Kafka Streams | Flink | Spark Streaming |
|---|---|---|---|
| 部署模式 | 嵌入式(JVM) | 独立集群 | 独立集群 |
| 状态管理 | RocksDB | 内存/RocksDB | 内存 |
| Exactly-Once | 支持 | 支持 | 支持 |
| 窗口类型 | 丰富 | 最丰富 | 基础 |
| 学习曲线 | 平缓 | 陡峭 | 中等 |
| Python API | kafka-python | PyFlink | PySpark |
StreamsBuilder → stream() → filter/map/flatMap → to()groupByKey().count() / .aggregate() / .reduce()Stores.persistentKeyValueStore + TransformerPROCESSING_GUARANTEE_CONFIG = EXACTLY_ONCE_V2NUM_STREAM_THREADS=4 / CACHE_MAX_BYTES_BUFFERING / RocksDB 配置env.addSource() → filter/map → addSink()TumblingProcessingTimeWindows.of(Time.minutes(5))SlidingProcessingTimeWindows.of(size, slide)ProcessingTimeSessionWindows.withGap(gap)GlobalWindows.create() + 自定义 Triggeraggregate(AggregateFunction, WindowFunction) 增量+全窗口env.enableCheckpointing(60000) + EXACTLY_ONCEflink run -s /path/to/savepointforBoundedOutOfOrderness)allowedLateness() + sideOutputLateData()完整性(非空) → 准确性(范围) → 一致性(关联) → 及时性(新鲜度) → 有效性(格式)
| 工具 | 优势 | 适用场景 |
|---|---|---|
| Great Expectations | 丰富 Expectations、Data Docs | Python 生态、复杂验证 |
| dbt | SQL 原生、血缘追踪 | 数据仓库、转换测试 |
| Soda Core | 简洁 YAML 配置 | 快速验证、CI/CD |
gx.get_context() → 添加数据源 → 构建批次expect_table_row_count_to_be_between(min, max)expect_column_values_to_not_be_null(column)expect_column_values_to_be_unique(column)expect_column_values_to_be_between(column, min, max)expect_column_values_to_be_in_set(column, value_set)expect_column_values_to_match_regex(column, regex)ColumnMapExpectationunique / not_null / accepted_values / relationships{% test name(model, column_name, params) %}tests/ 目录下自定义 SQL,返回行 = 失败expect_column_mean_to_be_between / expect_row_values_to_have_recent_datadbt test / dbt test --select model / dbt test --store-failures{{ ref('model') }} + {{ source('schema', 'table') }} → dbt docs generatechecks for table_name:
- row_count > 100
- missing_count(column) = 0
- duplicate_count(column) = 0
- invalid_count(column) = 0:
valid format: email
- freshness(timestamp_col) < 1d
| 实践 | 说明 |
|---|---|
| 幂等性设计 | UPSERT / 分区覆盖,重跑不产生副作用 |
| 增量处理 | 基于时间戳/CDC 增量提取,减少全量扫描 |
| 数据血缘 | dbt ref() / Dagster Asset deps 追踪上下游 |
| 分层验证 | 源→转换→目标每层都验证 |
| 监控告警 | 管道 SLA + 质量指标 + 延迟告警 |
| 状态管理 | 流处理状态 TTL + Checkpoint + Savepoint |
| 容错设计 | 重试策略 + 死信队列 + 回滚方案 |
数据管道、Airflow、Dagster、Prefect、ETL、流处理、Kafka Streams、Flink、数据质量、Great Expectations、dbt、数据验证、数据血缘
AI/LLM 能力索引。Agent 开发、LLM 安全、RAG 系统。当用户提到 AI、LLM、Agent、RAG、Prompt 时路由到此。
架构设计能力索引。API设计、安全架构、云原生、数据安全。当用户提到架构、设计、API、云原生时路由到此。
开发语言能力索引。Python、Go、Rust、TypeScript、Java、C++、Shell。当用户提到编程、开发、代码、语言时路由到此。
DevOps 能力索引。Git、测试、DevSecOps、数据库。当用户提到 DevOps、CI/CD、Git、测试时路由到此。
Claymorphism design system skill. Use when building soft, puffy, clay-like UI components with large radii, dual inner shadows, and offset outer shadows.
Glassmorphism design system skill. Use when building frosted-glass UI components with blur, transparency, and layered depth effects.