Manus에서 모든 스킬 실행
원클릭으로
원클릭으로
원클릭으로 Manus에서 모든 스킬 실행
시작하기data-engineering
Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design
스타4
포크1
업데이트2025년 12월 30일 12:44
파일 탐색기
6 개 파일SKILL.md
readonly메뉴
Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design
FastAPI, REST APIs, GraphQL, data service design, and API best practices
Apache Spark, Hadoop, distributed computing, and large-scale data processing for petabyte-scale workloads
Portfolio building, technical interviews, job search strategies, and continuous learning
GitHub Actions, GitLab CI, Jenkins, and automated deployment pipelines
AWS, GCP, Azure data platforms, infrastructure as code, and cloud-native data solutions
Docker, Kubernetes, container orchestration, and cloud-native deployment for data applications
| name | data-engineering |
| description | Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design |
| sasmp_version | 1.3.0 |
| bonded_agent | 01-data-engineer |
| bond_type | PRIMARY_BOND |
| skill_version | 2.0.0 |
| last_updated | 2025-01 |
| complexity | foundational |
| estimated_mastery_hours | 100 |
| prerequisites | ["python-programming","sql-databases"] |
| unlocks | ["etl-tools","big-data","data-warehousing"] |
Core data engineering concepts, patterns, and practices for building production data platforms.
# Production Data Pipeline Pattern
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
source: str
destination: str
batch_size: int = 10000
retry_count: int = 3
class DataPipeline:
"""Production-ready data pipeline with error handling."""
def __init__(self, config: PipelineConfig):
self.config = config
self.metrics = {"extracted": 0, "transformed": 0, "loaded": 0, "errors": 0}
def extract(self) -> Generator[dict, None, None]:
"""Extract data in batches from source."""
logger.info(f"Extracting from {self.config.source}")
offset = 0
while True:
batch = self._fetch_batch(offset, self.config.batch_size)
if not batch:
break
self.metrics["extracted"] += len(batch)
yield batch
offset += self.config.batch_size
def transform(self, batch: list[dict]) -> list[dict]:
"""Apply transformations with validation."""
transformed = []
for record in batch:
try:
cleaned = self._clean_record(record)
enriched = self._enrich_record(cleaned)
transformed.append(enriched)
except Exception as e:
logger.warning(f"Transform error: {e}")
self.metrics["errors"] += 1
self.metrics["transformed"] += len(transformed)
return transformed
def load(self, batch: list[dict]) -> None:
"""Load to destination with retry logic."""
for attempt in range(self.config.retry_count):
try:
self._write_batch(batch)
self.metrics["loaded"] += len(batch)
return
except Exception as e:
if attempt == self.config.retry_count - 1:
raise
logger.warning(f"Load attempt {attempt + 1} failed: {e}")
def run(self) -> dict:
"""Execute full ETL pipeline."""
start_time = datetime.now()
logger.info("Pipeline started")
for batch in self.extract():
transformed = self.transform(batch)
if transformed:
self.load(transformed)
duration = (datetime.now() - start_time).total_seconds()
self.metrics["duration_seconds"] = duration
logger.info(f"Pipeline completed: {self.metrics}")
return self.metrics
┌─────────────────────────────────────────────────────────────────┐
│ Modern Data Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Sources Ingestion Storage Consumption │
│ ──────── ───────── ─────── ─────────── │
│ ┌──────┐ ┌─────────┐ ┌───────┐ ┌──────────┐ │
│ │ APIs │───────▶│ Airbyte │─────▶│ Raw │──────▶│ BI Tools │ │
│ │ DBs │ │ Fivetran│ │ Layer │ │ Dashboards│ │
│ │ Files│ │ Kafka │ │ (S3) │ └──────────┘ │
│ │ SaaS │ └─────────┘ └───┬───┘ │
│ └──────┘ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Transform │ │
│ │ (dbt/Spark) │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ ┌──────────┐ │
│ │ Warehouse │──▶│ ML/AI │ │
│ │ (Snowflake) │ │ Pipelines│ │
│ └───────────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
# ETL Pattern (Transform before load)
# Best for: Sensitive data, complex transformations, limited storage
def etl_pipeline():
raw_data = extract_from_source()
cleaned_data = transform_and_clean(raw_data) # Transform first
load_to_destination(cleaned_data)
# ELT Pattern (Load then transform)
# Best for: Cloud warehouses, large scale, exploratory analysis
def elt_pipeline():
raw_data = extract_from_source()
load_to_staging(raw_data) # Load raw first
# Transform in warehouse with SQL/dbt
run_dbt_models()
from dataclasses import dataclass
from typing import Callable, Any
import pandas as pd
@dataclass
class DataQualityCheck:
name: str
check_fn: Callable[[pd.DataFrame], bool]
severity: str # "error" or "warning"
class DataQualityValidator:
def __init__(self, checks: list[DataQualityCheck]):
self.checks = checks
self.results = []
def validate(self, df: pd.DataFrame) -> bool:
all_passed = True
for check in self.checks:
passed = check.check_fn(df)
self.results.append({
"check": check.name,
"passed": passed,
"severity": check.severity
})
if not passed and check.severity == "error":
all_passed = False
return all_passed
# Define checks
checks = [
DataQualityCheck(
name="no_nulls_in_id",
check_fn=lambda df: df["id"].notna().all(),
severity="error"
),
DataQualityCheck(
name="positive_amounts",
check_fn=lambda df: (df["amount"] > 0).all(),
severity="error"
),
DataQualityCheck(
name="valid_dates",
check_fn=lambda df: pd.to_datetime(df["date"], errors="coerce").notna().all(),
severity="warning"
),
]
validator = DataQualityValidator(checks)
if not validator.validate(df):
raise ValueError(f"Data quality checks failed: {validator.results}")
from datetime import date
def idempotent_load(df: pd.DataFrame, table: str, partition_date: date):
"""
Idempotent load: can be re-run safely without duplicates.
Uses delete-then-insert pattern.
"""
# Delete existing data for this partition
db.execute(f"""
DELETE FROM {table}
WHERE partition_date = %(date)s
""", {"date": partition_date})
# Insert new data
df["partition_date"] = partition_date
df.to_sql(table, db, if_exists="append", index=False)
# Alternative: MERGE/UPSERT pattern
def upsert_records(df: pd.DataFrame, table: str, key_columns: list[str]):
"""Upsert: Update existing, insert new."""
temp_table = f"{table}_staging"
df.to_sql(temp_table, db, if_exists="replace", index=False)
key_match = " AND ".join([f"t.{k} = s.{k}" for k in key_columns])
db.execute(f"""
MERGE INTO {table} t
USING {temp_table} s ON {key_match}
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
""")
| Tool | Purpose | Version (2025) |
|---|---|---|
| Python | Pipeline development | 3.12+ |
| SQL | Data transformation | - |
| Airflow | Orchestration | 2.8+ |
| dbt | SQL transformations | 1.7+ |
| Spark | Large-scale processing | 3.5+ |
| Airbyte | Data integration | 0.55+ |
| Great Expectations | Data quality | 0.18+ |
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Duplicate Data | Count mismatch | Non-idempotent load | Use MERGE, add dedup |
| Schema Drift | Pipeline failure | Source schema changed | Schema validation, alerts |
| Data Freshness | Stale data | Pipeline delays | Monitor SLAs, alerting |
| Memory Error | OOM in pipeline | Large batch size | Chunked processing |
# ✅ DO: Make pipelines idempotent
delete_and_insert(partition_date)
# ✅ DO: Add observability
logger.info(f"Processed {count} records", extra={"metric": "records_processed"})
# ✅ DO: Validate data at boundaries
validate_schema(df, expected_schema)
validate_constraints(df)
# ✅ DO: Use incremental processing
WHERE updated_at > last_run_timestamp
# ❌ DON'T: Process all data every run
# ❌ DON'T: Skip data quality checks
# ❌ DON'T: Ignore schema changes
Skill Certification Checklist: