在 Manus 中运行任何 Skill
一键导入
一键导入
一键在 Manus 中运行任何 Skill
开始使用data-engineering
Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design
星标4
分支1
更新时间2025年12月30日 12:44
文件资源管理器
6 个文件SKILL.md
readonly菜单
Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design
FastAPI, REST APIs, GraphQL, data service design, and API best practices
Apache Spark, Hadoop, distributed computing, and large-scale data processing for petabyte-scale workloads
Portfolio building, technical interviews, job search strategies, and continuous learning
GitHub Actions, GitLab CI, Jenkins, and automated deployment pipelines
AWS, GCP, Azure data platforms, infrastructure as code, and cloud-native data solutions
Docker, Kubernetes, container orchestration, and cloud-native deployment for data applications
| name | data-engineering |
| description | Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design |
| sasmp_version | 1.3.0 |
| bonded_agent | 01-data-engineer |
| bond_type | PRIMARY_BOND |
| skill_version | 2.0.0 |
| last_updated | 2025-01 |
| complexity | foundational |
| estimated_mastery_hours | 100 |
| prerequisites | ["python-programming","sql-databases"] |
| unlocks | ["etl-tools","big-data","data-warehousing"] |
Core data engineering concepts, patterns, and practices for building production data platforms.
# Production Data Pipeline Pattern
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
source: str
destination: str
batch_size: int = 10000
retry_count: int = 3
class DataPipeline:
"""Production-ready data pipeline with error handling."""
def __init__(self, config: PipelineConfig):
self.config = config
self.metrics = {"extracted": 0, "transformed": 0, "loaded": 0, "errors": 0}
def extract(self) -> Generator[dict, None, None]:
"""Extract data in batches from source."""
logger.info(f"Extracting from {self.config.source}")
offset = 0
while True:
batch = self._fetch_batch(offset, self.config.batch_size)
if not batch:
break
self.metrics["extracted"] += len(batch)
yield batch
offset += self.config.batch_size
def transform(self, batch: list[dict]) -> list[dict]:
"""Apply transformations with validation."""
transformed = []
for record in batch:
try:
cleaned = self._clean_record(record)
enriched = self._enrich_record(cleaned)
transformed.append(enriched)
except Exception as e:
logger.warning(f"Transform error: {e}")
self.metrics["errors"] += 1
self.metrics["transformed"] += len(transformed)
return transformed
def load(self, batch: list[dict]) -> None:
"""Load to destination with retry logic."""
for attempt in range(self.config.retry_count):
try:
self._write_batch(batch)
self.metrics["loaded"] += len(batch)
return
except Exception as e:
if attempt == self.config.retry_count - 1:
raise
logger.warning(f"Load attempt {attempt + 1} failed: {e}")
def run(self) -> dict:
"""Execute full ETL pipeline."""
start_time = datetime.now()
logger.info("Pipeline started")
for batch in self.extract():
transformed = self.transform(batch)
if transformed:
self.load(transformed)
duration = (datetime.now() - start_time).total_seconds()
self.metrics["duration_seconds"] = duration
logger.info(f"Pipeline completed: {self.metrics}")
return self.metrics
┌─────────────────────────────────────────────────────────────────┐
│ Modern Data Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Sources Ingestion Storage Consumption │
│ ──────── ───────── ─────── ─────────── │
│ ┌──────┐ ┌─────────┐ ┌───────┐ ┌──────────┐ │
│ │ APIs │───────▶│ Airbyte │─────▶│ Raw │──────▶│ BI Tools │ │
│ │ DBs │ │ Fivetran│ │ Layer │ │ Dashboards│ │
│ │ Files│ │ Kafka │ │ (S3) │ └──────────┘ │
│ │ SaaS │ └─────────┘ └───┬───┘ │
│ └──────┘ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Transform │ │
│ │ (dbt/Spark) │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ ┌──────────┐ │
│ │ Warehouse │──▶│ ML/AI │ │
│ │ (Snowflake) │ │ Pipelines│ │
│ └───────────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
# ETL Pattern (Transform before load)
# Best for: Sensitive data, complex transformations, limited storage
def etl_pipeline():
raw_data = extract_from_source()
cleaned_data = transform_and_clean(raw_data) # Transform first
load_to_destination(cleaned_data)
# ELT Pattern (Load then transform)
# Best for: Cloud warehouses, large scale, exploratory analysis
def elt_pipeline():
raw_data = extract_from_source()
load_to_staging(raw_data) # Load raw first
# Transform in warehouse with SQL/dbt
run_dbt_models()
from dataclasses import dataclass
from typing import Callable, Any
import pandas as pd
@dataclass
class DataQualityCheck:
name: str
check_fn: Callable[[pd.DataFrame], bool]
severity: str # "error" or "warning"
class DataQualityValidator:
def __init__(self, checks: list[DataQualityCheck]):
self.checks = checks
self.results = []
def validate(self, df: pd.DataFrame) -> bool:
all_passed = True
for check in self.checks:
passed = check.check_fn(df)
self.results.append({
"check": check.name,
"passed": passed,
"severity": check.severity
})
if not passed and check.severity == "error":
all_passed = False
return all_passed
# Define checks
checks = [
DataQualityCheck(
name="no_nulls_in_id",
check_fn=lambda df: df["id"].notna().all(),
severity="error"
),
DataQualityCheck(
name="positive_amounts",
check_fn=lambda df: (df["amount"] > 0).all(),
severity="error"
),
DataQualityCheck(
name="valid_dates",
check_fn=lambda df: pd.to_datetime(df["date"], errors="coerce").notna().all(),
severity="warning"
),
]
validator = DataQualityValidator(checks)
if not validator.validate(df):
raise ValueError(f"Data quality checks failed: {validator.results}")
from datetime import date
def idempotent_load(df: pd.DataFrame, table: str, partition_date: date):
"""
Idempotent load: can be re-run safely without duplicates.
Uses delete-then-insert pattern.
"""
# Delete existing data for this partition
db.execute(f"""
DELETE FROM {table}
WHERE partition_date = %(date)s
""", {"date": partition_date})
# Insert new data
df["partition_date"] = partition_date
df.to_sql(table, db, if_exists="append", index=False)
# Alternative: MERGE/UPSERT pattern
def upsert_records(df: pd.DataFrame, table: str, key_columns: list[str]):
"""Upsert: Update existing, insert new."""
temp_table = f"{table}_staging"
df.to_sql(temp_table, db, if_exists="replace", index=False)
key_match = " AND ".join([f"t.{k} = s.{k}" for k in key_columns])
db.execute(f"""
MERGE INTO {table} t
USING {temp_table} s ON {key_match}
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
""")
| Tool | Purpose | Version (2025) |
|---|---|---|
| Python | Pipeline development | 3.12+ |
| SQL | Data transformation | - |
| Airflow | Orchestration | 2.8+ |
| dbt | SQL transformations | 1.7+ |
| Spark | Large-scale processing | 3.5+ |
| Airbyte | Data integration | 0.55+ |
| Great Expectations | Data quality | 0.18+ |
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Duplicate Data | Count mismatch | Non-idempotent load | Use MERGE, add dedup |
| Schema Drift | Pipeline failure | Source schema changed | Schema validation, alerts |
| Data Freshness | Stale data | Pipeline delays | Monitor SLAs, alerting |
| Memory Error | OOM in pipeline | Large batch size | Chunked processing |
# ✅ DO: Make pipelines idempotent
delete_and_insert(partition_date)
# ✅ DO: Add observability
logger.info(f"Processed {count} records", extra={"metric": "records_processed"})
# ✅ DO: Validate data at boundaries
validate_schema(df, expected_schema)
validate_constraints(df)
# ✅ DO: Use incremental processing
WHERE updated_at > last_run_timestamp
# ❌ DON'T: Process all data every run
# ❌ DON'T: Skip data quality checks
# ❌ DON'T: Ignore schema changes
Skill Certification Checklist: