with one click
data-pipeline
// Design, build, or debug data processing pipelines. Use when asked to process a dataset, transform data, build an ETL pipeline, schedule batch jobs, or fix data quality issues.
// Design, build, or debug data processing pipelines. Use when asked to process a dataset, transform data, build an ETL pipeline, schedule batch jobs, or fix data quality issues.
Design or review REST and GraphQL API interfaces. Use when asked to design an API, review endpoint structure, define request/response schemas, or improve API ergonomics.
Perform a structured security and quality audit on source code. Use when asked to review code, audit a pull request, check for vulnerabilities, or assess code quality.
Safely run database schema migrations. Use when asked to update database schema, add columns, create tables, run alembic, or apply Django migrations.
Audit project dependencies for vulnerabilities, license issues, and bloat. Use when asked to check dependencies, audit packages, find vulnerable libraries, or reduce bundle size.
Execute a structured deployment to staging or production. Use when asked to deploy, ship, release, push to production, or promote to staging.
Write or update technical documentation for code, APIs, or systems. Use when asked to document a module, write a README, generate API docs, or update existing documentation.
| name | data-pipeline |
| description | Design, build, or debug data processing pipelines. Use when asked to process a dataset, transform data, build an ETL pipeline, schedule batch jobs, or fix data quality issues. |
| license | MIT |
| compatibility | Requires python 3.10+ |
Data pipelines fail silently and corrupt downstream systems. Every pipeline must be observable, idempotent, and validated at the boundary.
Define the contract. Before writing any transformation code, specify:
Validate at the boundary. The first thing any pipeline stage does is validate its input:
from pydantic import BaseModel, ValidationError
class InputRecord(BaseModel):
user_id: int
event_type: str
timestamp: str # ISO 8601
value: float | None = None
def process(raw_records: list[dict]) -> list[dict]:
valid, invalid = [], []
for r in raw_records:
try:
valid.append(InputRecord(**r).model_dump())
except ValidationError as e:
invalid.append({"record": r, "error": str(e)})
if invalid:
log_invalid_records(invalid) # Never silently drop
return transform(valid)
Make it idempotent. Running the pipeline twice on the same input must produce the same output. Use upserts, not inserts. Use deterministic IDs based on input content, not auto-increment.
Log progress at meaningful checkpoints. After every major stage (extract, validate, transform, load), log the record count and any failures.
Test with a sample. Before running on the full dataset, run on 100 records. Confirm the output schema, record count, and that no records were silently dropped.
Run on the full dataset. Monitor progress. On completion, report: records in, records out, records failed, and time elapsed.
| Excuse | Rebuttal |
|---|---|
| "I'll add validation later" | Invalid data corrupts your database. Validate at the boundary now. |
| "Logging slows the pipeline down" | A pipeline that fails without logs requires a full rerun to debug. Log it. |
| "It worked on the sample" | Test samples are not representative. Always run a full-dataset dry run before writing to the destination. |