| name | data-engineering |
| description | Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design |
| sasmp_version | 1.3.0 |
| bonded_agent | 01-data-engineer |
| bond_type | PRIMARY_BOND |
| skill_version | 2.0.0 |
| last_updated | 2025-01 |
| complexity | foundational |
| estimated_mastery_hours | 100 |
| prerequisites | ["python-programming","sql-databases"] |
| unlocks | ["etl-tools","big-data","data-warehousing"] |
Data Engineering Fundamentals
Core data engineering concepts, patterns, and practices for building production data platforms.
Quick Start
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
source: str
destination: str
batch_size: int = 10000
retry_count: int = 3
class DataPipeline:
"""Production-ready data pipeline with error handling."""
def __init__(self, config: PipelineConfig):
self.config = config
self.metrics = {"extracted": 0, "transformed": 0, "loaded": 0, "errors": 0}
def extract(self) -> Generator[dict, None, None]:
"""Extract data in batches from source."""
logger.info(f"Extracting from {self.config.source}")
offset = 0
while True:
batch = self._fetch_batch(offset, self.config.batch_size)
if not batch:
break
self.metrics["extracted"] += len(batch)
yield batch
offset += self.config.batch_size
def transform(self, batch: list[dict]) -> list[dict]:
"""Apply transformations with validation."""
transformed = []
for record in batch:
try:
cleaned = self._clean_record(record)
enriched = self._enrich_record(cleaned)
transformed.append(enriched)
except Exception as e:
logger.warning(f"Transform error: {e}")
self.metrics["errors"] += 1
self.metrics["transformed"] += len(transformed)
return transformed
def load(self, batch: list[dict]) -> None:
"""Load to destination with retry logic."""
for attempt in range(self.config.retry_count):
try:
self._write_batch(batch)
self.metrics["loaded"] += len(batch)
return
except Exception as e:
if attempt == self.config.retry_count - 1:
raise
logger.warning(f"Load attempt {attempt + 1} failed: {e}")
def run(self) -> dict:
"""Execute full ETL pipeline."""
start_time = datetime.now()
logger.info("Pipeline started")
for batch in self.extract():
transformed = self.transform(batch)
if transformed:
self.load(transformed)
duration = (datetime.now() - start_time).total_seconds()
self.metrics["duration_seconds"] = duration
logger.info(f"Pipeline completed: {self.metrics}")
return self.metrics
Core Concepts
1. Data Architecture Patterns
┌─────────────────────────────────────────────────────────────────┐
│ Modern Data Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Sources Ingestion Storage Consumption │
│ ──────── ───────── ─────── ─────────── │
│ ┌──────┐ ┌─────────┐ ┌───────┐ ┌──────────┐ │
│ │ APIs │───────▶│ Airbyte │─────▶│ Raw │──────▶│ BI Tools │ │
│ │ DBs │ │ Fivetran│ │ Layer │ │ Dashboards│ │
│ │ Files│ │ Kafka │ │ (S3) │ └──────────┘ │
│ │ SaaS │ └─────────┘ └───┬───┘ │
│ └──────┘ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Transform │ │
│ │ (dbt/Spark) │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ ┌──────────┐ │
│ │ Warehouse │──▶│ ML/AI │ │
│ │ (Snowflake) │ │ Pipelines│ │
│ └───────────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
2. ETL vs ELT
def etl_pipeline():
raw_data = extract_from_source()
cleaned_data = transform_and_clean(raw_data)
load_to_destination(cleaned_data)
def elt_pipeline():
raw_data = extract_from_source()
load_to_staging(raw_data)
run_dbt_models()
3. Data Quality Framework
from dataclasses import dataclass
from typing import Callable, Any
import pandas as pd
@dataclass
class DataQualityCheck:
name: str
check_fn: Callable[[pd.DataFrame], bool]
severity: str
class DataQualityValidator:
def __init__(self, checks: list[DataQualityCheck]):
self.checks = checks
self.results = []
def validate(self, df: pd.DataFrame) -> bool:
all_passed = True
for check in self.checks:
passed = check.check_fn(df)
self.results.append({
"check": check.name,
"passed": passed,
"severity": check.severity
})
if not passed and check.severity == "error":
all_passed = False
return all_passed
checks = [
DataQualityCheck(
name="no_nulls_in_id",
check_fn=lambda df: df["id"].notna().all(),
severity="error"
),
DataQualityCheck(
name="positive_amounts",
check_fn=lambda df: (df["amount"] > 0).all(),
severity="error"
),
DataQualityCheck(
name="valid_dates",
check_fn=lambda df: pd.to_datetime(df["date"], errors="coerce").notna().all(),
severity="warning"
),
]
validator = DataQualityValidator(checks)
if not validator.validate(df):
raise ValueError(f"Data quality checks failed: {validator.results}")
4. Idempotent Operations
from datetime import date
def idempotent_load(df: pd.DataFrame, table: str, partition_date: date):
"""
Idempotent load: can be re-run safely without duplicates.
Uses delete-then-insert pattern.
"""
db.execute(f"""
DELETE FROM {table}
WHERE partition_date = %(date)s
""", {"date": partition_date})
df["partition_date"] = partition_date
df.to_sql(table, db, if_exists="append", index=False)
def upsert_records(df: pd.DataFrame, table: str, key_columns: list[str]):
"""Upsert: Update existing, insert new."""
temp_table = f"{table}_staging"
df.to_sql(temp_table, db, if_exists="replace", index=False)
key_match = " AND ".join([f"t.{k} = s.{k}" for k in key_columns])
db.execute(f"""
MERGE INTO {table} t
USING {temp_table} s ON {key_match}
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
""")
Tools & Technologies
| Tool | Purpose | Version (2025) |
|---|
| Python | Pipeline development | 3.12+ |
| SQL | Data transformation | - |
| Airflow | Orchestration | 2.8+ |
| dbt | SQL transformations | 1.7+ |
| Spark | Large-scale processing | 3.5+ |
| Airbyte | Data integration | 0.55+ |
| Great Expectations | Data quality | 0.18+ |
Troubleshooting Guide
| Issue | Symptoms | Root Cause | Fix |
|---|
| Duplicate Data | Count mismatch | Non-idempotent load | Use MERGE, add dedup |
| Schema Drift | Pipeline failure | Source schema changed | Schema validation, alerts |
| Data Freshness | Stale data | Pipeline delays | Monitor SLAs, alerting |
| Memory Error | OOM in pipeline | Large batch size | Chunked processing |
Best Practices
delete_and_insert(partition_date)
logger.info(f"Processed {count} records", extra={"metric": "records_processed"})
validate_schema(df, expected_schema)
validate_constraints(df)
WHERE updated_at > last_run_timestamp
Resources
Skill Certification Checklist: