| name | data-pipeline-operations |
| description | Activates when working with Python data pipelines, GCS operations, or medallion architecture (Bronze/Silver/Gold).
Use this skill for: running pipelines, debugging data transformations, GCS uploads/downloads,
data quality validation, CVR/CHR/BFE identifier handling, GeoPandas/PostGIS operations, and DuckDB queries for large files.
Keywords: pipeline, bronze, silver, gold, GCS, parquet, CVR, CHR, BFE, transform, ingest, ETL, DuckDB, large files
|
Data Pipeline Operations Skill
This skill provides guidance for working with Landbruget.dk's data pipelines following the medallion architecture.
Activation Context
This skill activates when:
- Running or debugging data pipelines
- Working with GCS (Google Cloud Storage)
- Handling data transformations (Bronze → Silver → Gold)
- Validating Danish identifiers (CVR, CHR, BFE)
- Working with geospatial data (GeoPandas, PostGIS)
Environment Setup
ALWAYS start with:
cd backend
source venv/bin/activate
Verify environment:
python -c "import geopandas, supabase; print('Environment OK')"
Data Processing Philosophy
PREFER DuckDB over Pandas:
- DuckDB queries files directly without memory limits
- Much faster for large datasets
- Use SQL instead of DataFrame operations
- Only use Pandas for final small result sets or when GeoPandas is required
CRS Strategy
Process in EPSG:25832, transform to EPSG:4326 only at final Supabase upload.
| EPSG | Name | Use |
|---|
| 25832 | UTM 32N | Processing (Bronze/Silver/Gold) |
| 4326 | WGS84 | Final storage (Supabase only) |
This eliminates unnecessary transforms:
- ❌ Old: Source(25832) → Silver(4326) → Gold(25832 for calc) → Supabase(4326) = 2-3 transforms
- ✅ New: Source(25832) → Process(25832) → Supabase(4326) = 1 transform
Medallion Architecture
Bronze Layer (Raw Data)
- Purpose: Preserve data exactly as received
- Location:
r2://landbruget-data/bronze/<source>/<date>/
- CRS: Keep native (usually EPSG:25832 from Danish WFS sources)
- Rules:
- Never modify raw data or geometry
- Add metadata:
_fetch_timestamp, _source, _source_crs
- Use Parquet format
- Immutable - never overwrite
_source_crs = detect_crs_from_response(wfs_capabilities)
Silver Layer (Cleaned Data)
- Purpose: Clean, validate, standardize
- CRS: Keep EPSG:25832 (no transformation yet!)
- Transformations:
- Type coercion (dates, numbers)
- CVR formatting: 8 digits, zero-padded
- CHR formatting: 6 digits
- Transform non-25832 sources (DAGI, H3) to EPSG:25832 here
- Deduplication
- Null handling
if source_crs != "EPSG:25832":
ST_Transform(geometry, source_crs, 'EPSG:25832')
Gold Layer (Analysis-Ready)
- Purpose: Enriched, joined datasets
- CRS: Keep EPSG:25832 for processing (area/buffer/distance work natively!)
- Operations:
- Join multiple sources on CVR/CHR/BFE
- Calculate derived metrics (meters work directly!)
- Aggregate by company/farm
- Transform to EPSG:4326 only at final Supabase upload
ST_Area(geometry) / 10000
ST_Buffer(geometry, 1000)
ST_Transform(geometry, 'EPSG:25832', 'EPSG:4326')
Data Quality Validation
CVR Number (Company ID)
import re
def validate_cvr(cvr: str) -> bool:
"""CVR must be 8 digits."""
return bool(re.match(r'^\d{8}$', str(cvr).zfill(8)))
df['cvr'] = df['cvr'].astype(str).str.zfill(8)
CHR Number (Herd ID)
def validate_chr(chr_num: str) -> bool:
"""CHR must be 6 digits."""
return bool(re.match(r'^\d{6}$', str(chr_num)))
Geospatial CRS
import geopandas as gpd
gdf_for_supabase = gdf.to_crs('EPSG:4326')
Buffer/Distance in DuckDB
With EPSG:25832, buffer/distance work natively in meters!
ST_Buffer(geometry, 1000)
ST_Area(geometry) / 10000
If working with EPSG:4326 data (avoid when possible):
from common.crs_utils import sql_buffer_meters, sql_intersects_with_buffer_meters
buffer_sql = sql_buffer_meters("geometry", 100)
intersect_sql = sql_intersects_with_buffer_meters("a.geom", "b.geom", 1000)
Cloud Storage Operations (R2)
Bucket: landbruget-data (set via R2_BUCKET or STORAGE_BUCKET env var)
Browse R2 with rclone
rclone lsd r2:landbruget-data/silver/
rclone lsd r2:landbruget-data/silver/subsidies/
rclone ls r2:landbruget-data/silver/subsidies/
Read/Write with StorageAccess
from common.storage.core import StorageAccess
storage = StorageAccess()
storage.create_table_from_storage_parquet("my_table", "landbruget-data/silver/subsidies/*/data.parquet")
storage.save_table_to_storage_parquet("my_table", "landbruget-data/gold/output/data.parquet")
Query R2 Directly with DuckDB
import duckdb
from common.storage.filesystem import setup_duckdb_cloud_auth
conn = duckdb.connect()
setup_duckdb_cloud_auth(conn)
result = conn.execute("""
SELECT cvr_number, SUM(area_ha) as total_area
FROM read_parquet('r2://landbruget-data/silver/fields/*/data.parquet')
GROUP BY cvr_number
""").fetchdf()
Running Pipelines
Standard Pipeline Execution
cd backend
source venv/bin/activate
cd pipelines/<pipeline_name>
python main.py
Common Pipelines
| Pipeline | Purpose | Frequency |
|---|
unified_pipeline | 18+ Danish govt sources | Weekly |
chr_pipeline | Livestock tracking | Weekly |
svineflytning_pipeline | Pig movements | Weekly |
drive_data_pipeline | Regulatory compliance | On-demand |
DuckDB for Large Files
DuckDB is excellent for querying large files without loading into memory:
import duckdb
result = duckdb.query("""
SELECT cvr_number, SUM(area_ha) as total_area
FROM 'large_file.csv'
WHERE date >= '2024-01-01'
GROUP BY cvr_number
""").df()
result = duckdb.query("""
SELECT *
FROM 'data.parquet'
WHERE cvr_number = '12345678'
""").df()
result = duckdb.query("""
SELECT a.*, b.name
FROM 'fields.parquet' a
JOIN 'companies.csv' b ON a.cvr_number = b.cvr_number
WHERE a.area_ha > 100
""").df()
result = duckdb.query("""
SELECT
cvr_number,
COUNT(*) as field_count,
SUM(area_ha) as total_area,
AVG(area_ha) as avg_area
FROM 'fields.parquet'
GROUP BY cvr_number
HAVING total_area > 1000
""").df()
DuckDB Advantages
- No memory limits: Queries files directly without loading
- SQL interface: Use familiar SQL syntax
- Fast: Highly optimized columnar engine
- Multiple formats: CSV, Parquet, JSON
- Joins: Combine multiple files efficiently
DuckDB Spatial — Functions That Do NOT Exist
DuckDB's spatial extension is not PostGIS. These PostGIS functions do not exist in DuckDB:
| PostGIS Function | DuckDB Alternative |
|---|
ST_SRID(geometry) | Use bounds-based CRS detection: detect_crs_from_bounds() from common/crs_utils.py |
ST_SetSRID(geometry, srid) | Not needed — DuckDB geometries don't carry SRID metadata |
ST_GeogFromText() | Use ST_GeomFromText() |
ST_DistanceSphere() | Transform to UTM first, then use ST_Distance() in meters |
ST_DWithin() (geography) | Transform to UTM, then ST_Distance(a, b) < threshold_meters |
CRS detection pattern (use instead of ST_SRID):
from common.crs_utils import detect_crs_from_bounds, sql_transform_to_processing_crs, DANISH_UTM
bounds = conn.execute(f"""
SELECT MIN(ST_XMin(geometry)), MAX(ST_XMax(geometry)),
MIN(ST_YMin(geometry)), MAX(ST_YMax(geometry))
FROM {table} WHERE geometry IS NOT NULL
""").fetchone()
detected_crs, _ = detect_crs_from_bounds(*bounds)
if detected_crs == DANISH_UTM:
geom_expr = "geometry"
else:
geom_expr = sql_transform_to_processing_crs("geometry", detected_crs)
Other DuckDB 1.5+ spatial gotchas:
- Wrap geometry ops with
TRY() to handle invalid geometries gracefully
- Use
delim parameter, not DELIMITER (breaking change in 1.5)
ST_Area_Spheroid uses LON/LAT (x, y) order with geometry_always_xy=true default
Troubleshooting
"Module not found"
cd backend
source venv/bin/activate
pip install -e .
GCS Authentication
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
Memory Issues
ALWAYS use DuckDB for large files - avoid Pandas:
import duckdb
result = duckdb.query("""
SELECT cvr_number, area_ha
FROM 'large.csv'
WHERE condition
""").df()
When to Use Pandas vs DuckDB
Use DuckDB (preferred):
- Reading CSV/Parquet files
- Filtering, aggregating, joining data
- Any operation on data > 1GB
- Transformations that can be expressed in SQL
Use Pandas only when:
- Working with GeoPandas (spatial operations)
- Final result set is small (<100MB)
- Need very specific Python operations unavailable in SQL
Use GeoPandas only for:
- Geometry operations (ST_Transform, ST_Within, etc.)
- Spatial joins
- CRS transformations
Quality Checklist
Before marking pipeline work complete: