| name | databricks-data-engineering |
| description | Production data engineering pipelines following medallion architecture (Bronze/Silver/Gold layers) with data ingestion, transformation, quality checks, Delta Lake optimization, and orchestration. Use when building ETL pipelines, medallion architecture, data lakes, or data transformation workflows. |
| allowed-tools | ["Bash","Read","Write","Edit","Grep","Glob"] |
| model | claude-sonnet-4-5-20250929 |
| user-invocable | true |
Databricks Data Engineering Pipelines
Build production-grade data pipelines following medallion architecture (Bronze/Silver/Gold) with data quality checks, Delta Lake optimization, and multi-layer transformations.
When to Use This Skill
- Building ETL/ELT data pipelines
- Implementing medallion architecture
- Data lake transformations
- Data quality and validation workflows
- Incremental data processing
- Batch data pipelines
- Real-time streaming (structured streaming)
Medallion Architecture
The medallion architecture organizes data into three layers of increasing quality:
Bronze Layer (Raw/Landing)
↓
Silver Layer (Cleaned/Validated)
↓
Gold Layer (Business/Aggregated)
Bronze Layer (Raw Ingestion)
- Purpose: Ingest raw data with minimal transformation
- Pattern: Append-only, preserve source format
- Transformations: Type casting, timestamp addition
- Storage: Delta Lake tables
- Schema: Flexible, can evolve
Silver Layer (Cleaned Data)
- Purpose: Cleaned, validated, and conformed data
- Pattern: Deduplication, quality checks, schema enforcement
- Transformations: Data quality rules, null handling, type validation
- Storage: Delta Lake tables (optimized)
- Schema: Strict, well-defined
Gold Layer (Business-Ready)
- Purpose: Aggregated, business-ready datasets
- Pattern: Joins, aggregations, business logic
- Transformations: Metrics, KPIs, analytics-ready views
- Storage: Delta Lake tables (highly optimized)
- Schema: Denormalized for analytics
Complete Data Pipeline Workflow
Phase 1: Schema Setup
Use databricks-unity-catalog skill to create medallion schemas:
catalog = "de_prod"
create_schema(
catalog_name=catalog,
schema_name="bronze",
comment="Raw ingested data. Minimal transformation. Append-only. 90-day retention."
)
create_schema(
catalog_name=catalog,
schema_name="silver",
comment="Cleaned and validated data. Deduplicated, quality-checked. 1-year retention."
)
create_schema(
catalog_name=catalog,
schema_name="gold",
comment="Business-ready aggregates. Optimized for analytics. 3-year retention."
)
Phase 2: Bronze Layer Development
Use databricks-testing skill to test ingestion logic:
databricks_command(
cluster_id="0123-456789-abc123",
language="python",
code="""
from pyspark.sql import functions as F
# Ingest raw data
raw_df = (
spark.read
.format("json") # or csv, parquet, etc.
.option("inferSchema", "true")
.load("/mnt/source/events/*.json")
)
# Add ingestion metadata
bronze_df = (
raw_df
.withColumn("ingestion_timestamp", F.current_timestamp())
.withColumn("ingestion_date", F.current_date())
.withColumn("source_file", F.input_file_name())
)
print(f"Ingested {bronze_df.count()} records")
# Save to bronze
bronze_df.write \\
.format("delta") \\
.mode("append") \\
.saveAsTable("de_prod.bronze.raw_events")
print("Bronze ingestion complete")
"""
)
Phase 3: Silver Layer Development
Use databricks-testing skill to test cleaning logic:
Complete Silver Layer Notebook:
try:
catalog = dbutils.widgets.get("catalog")
except:
catalog = "de_dev"
try:
bronze_schema = dbutils.widgets.get("bronze_schema")
except:
bronze_schema = "bronze"
try:
silver_schema = dbutils.widgets.get("silver_schema")
except:
silver_schema = "silver"
try:
batch_date = dbutils.widgets.get("batch_date")
except:
from datetime import date
batch_date = str(date.today())
print(f"Processing silver layer:")
print(f" Catalog: {catalog}")
print(f" Bronze schema: {bronze_schema}")
print(f" Silver schema: {silver_schema}")
print(f" Batch date: {batch_date}")
from pyspark.sql import functions as F
from pyspark.sql.window import Window
bronze_df = spark.read.table(f"{catalog}.{bronze_schema}.raw_events") \\
.filter(F.col("ingestion_date") == batch_date)
print(f"Bronze records for {batch_date}: {bronze_df.count()}")
print("Data Quality Report - Before Cleaning:")
print(f" Total records: {bronze_df.count()}")
print(f" Null event_id: {bronze_df.filter(F.col('event_id').isNull()).count()}")
print(f" Null timestamp: {bronze_df.filter(F.col('timestamp').isNull()).count()}")
print(f" Duplicate event_id: {bronze_df.groupBy('event_id').count().filter(F.col('count') > 1).count()}")
initial_count = bronze_df.count()
silver_df = (
bronze_df
.filter(F.col("event_id").isNotNull())
.filter(F.col("timestamp").isNotNull())
.filter(F.col("user_id").isNotNull())
.withColumn("row_num", F.row_number().over(
Window.partitionBy("event_id").orderBy(F.col("timestamp").desc())
))
.filter(F.col("row_num") == 1)
.drop("row_num")
.withColumn("amount", F.col("amount").cast("double"))
.withColumn("quantity", F.col("quantity").cast("int"))
.withColumn("timestamp", F.col("timestamp").cast("timestamp"))
.filter(F.col("amount") >= 0)
.filter(F.col("quantity") > 0)
.withColumn("silver_processed_at", F.current_timestamp())
.withColumn("silver_processing_date", F.current_date())
.drop("source_file", "ingestion_timestamp", "ingestion_date")
)
final_count = silver_df.count()
removed_count = initial_count - final_count
print(f"\\nData Quality Report - After Cleaning:")
print(f" Clean records: {final_count}")
print(f" Removed records: {removed_count} ({removed_count/initial_count*100:.2f}%)")
assert silver_df.filter(F.col("event_id").isNull()).count() == 0, "Null event_ids remain"
assert silver_df.filter(F.col("timestamp").isNull()).count() == 0, "Null timestamps remain"
assert silver_df.groupBy("event_id").count().filter(F.col("count") > 1).count() == 0, "Duplicates remain"
print("✓ All quality checks passed")
from delta.tables import DeltaTable
silver_table = f"{catalog}.{silver_schema}.clean_events"
if spark.catalog.tableExists(silver_table):
deltaTable = DeltaTable.forName(spark, silver_table)
deltaTable.alias("target").merge(
silver_df.alias("source"),
"target.event_id = source.event_id"
).whenMatchedUpdateAll() \\
.whenNotMatchedInsertAll() \\
.execute()
print(f"Merged {final_count} records into {silver_table}")
else:
silver_df.write \\
.format("delta") \\
.mode("overwrite") \\
.saveAsTable(silver_table)
print(f"Created {silver_table} with {final_count} records")
spark.sql(f"OPTIMIZE {silver_table}")
print(f"✓ Optimized {silver_table}")
spark.sql(f"ANALYZE TABLE {silver_table} COMPUTE STATISTICS")
print(f"✓ Updated statistics for {silver_table}")
spark.sql(f"OPTIMIZE {silver_table} ZORDER BY (event_id, timestamp)")
print(f"✓ Z-ordered {silver_table}")
print(f"\\n{'='*50}")
print("Silver layer processing complete!")
print(f" Input (Bronze): {initial_count} records")
print(f" Output (Silver): {final_count} records")
print(f" Quality: {final_count/initial_count*100:.2f}% retention")
print(f"{'='*50}")
Phase 4: Gold Layer Development
Use databricks-testing skill to test aggregation logic:
Complete Gold Layer Notebook:
try:
catalog = dbutils.widgets.get("catalog")
except:
catalog = "de_dev"
try:
silver_schema = dbutils.widgets.get("silver_schema")
except:
silver_schema = "silver"
try:
gold_schema = dbutils.widgets.get("gold_schema")
except:
gold_schema = "gold"
print(f"Processing gold layer:")
print(f" Catalog: {catalog}")
print(f" Silver schema: {silver_schema}")
print(f" Gold schema: {gold_schema}")
from pyspark.sql import functions as F
from pyspark.sql.window import Window
silver_df = spark.table(f"{catalog}.{silver_schema}.clean_events")
print(f"Silver records: {silver_df.count()}")
daily_user_metrics = (
silver_df
.withColumn("event_date", F.to_date("timestamp"))
.groupBy("event_date", "user_id")
.agg(
F.count("*").alias("event_count"),
F.sum("amount").alias("total_amount"),
F.avg("amount").alias("avg_amount"),
F.sum("quantity").alias("total_quantity"),
F.countDistinct("event_id").alias("unique_events"),
F.min("timestamp").alias("first_event_time"),
F.max("timestamp").alias("last_event_time")
)
.withColumn("gold_created_at", F.current_timestamp())
)
print(f"Daily user metrics: {daily_user_metrics.count()} records")
weekly_metrics = (
silver_df
.withColumn("week_start", F.date_trunc("week", "timestamp"))
.groupBy("week_start")
.agg(
F.count("*").alias("total_events"),
F.countDistinct("user_id").alias("active_users"),
F.sum("amount").alias("weekly_revenue"),
F.avg("amount").alias("avg_transaction_value")
)
.withColumn("gold_created_at", F.current_timestamp())
)
print(f"Weekly metrics: {weekly_metrics.count()} records")
user_lifetime_metrics = (
silver_df
.groupBy("user_id")
.agg(
F.count("*").alias("lifetime_events"),
F.sum("amount").alias("lifetime_value"),
F.avg("amount").alias("avg_order_value"),
F.min("timestamp").alias("first_purchase_date"),
F.max("timestamp").alias("last_purchase_date"),
F.countDistinct(F.to_date("timestamp")).alias("purchase_days")
)
.withColumn(
"days_since_last_purchase",
F.datediff(F.current_date(), F.col("last_purchase_date"))
)
.withColumn(
"customer_lifetime_days",
F.datediff(F.col("last_purchase_date"), F.col("first_purchase_date"))
)
.withColumn("gold_created_at", F.current_timestamp())
)
print(f"User lifetime metrics: {user_lifetime_metrics.count()} users")
daily_user_metrics.write \\
.format("delta") \\
.mode("overwrite") \\
.option("overwriteSchema", "true") \\
.saveAsTable(f"{catalog}.{gold_schema}.daily_user_metrics")
weekly_metrics.write \\
.format("delta") \\
.mode("overwrite") \\
.option("overwriteSchema", "true") \\
.saveAsTable(f"{catalog}.{gold_schema}.weekly_metrics")
user_lifetime_metrics.write \\
.format("delta") \\
.mode("overwrite") \\
.option("overwriteSchema", "true") \\
.saveAsTable(f"{catalog}.{gold_schema}.user_lifetime_metrics")
print("✓ Gold tables created")
for table in ["daily_user_metrics", "weekly_metrics", "user_lifetime_metrics"]:
full_table = f"{catalog}.{gold_schema}.{table}"
spark.sql(f"OPTIMIZE {full_table}")
spark.sql(f"ANALYZE TABLE {full_table} COMPUTE STATISTICS")
print(f"✓ Optimized {full_table}")
print(f"\\n{'='*50}")
print("Gold layer processing complete!")
print(f" Daily user metrics: {daily_user_metrics.count()} records")
print(f" Weekly metrics: {weekly_metrics.count()} records")
print(f" User lifetime metrics: {user_lifetime_metrics.count()} users")
print(f"{'='*50}")
Phase 5: Deployment
Use databricks-bundle-deploy skill to package medallion pipeline:
databricks.yml:
bundle:
name: medallion_pipeline
variables:
catalog:
description: "Unity Catalog name"
default: "de_dev"
bronze_schema:
description: "Bronze layer schema"
default: "bronze"
silver_schema:
description: "Silver layer schema"
default: "silver"
gold_schema:
description: "Gold layer schema"
default: "gold"
targets:
dev:
mode: development
variables:
catalog: "de_dev"
prod:
mode: production
variables:
catalog: "de_prod"
resources:
jobs:
medallion_job:
name: medallion_pipeline_${bundle.target}
tasks:
- task_key: bronze_ingestion
notebook_task:
notebook_path: ../src/medallion/notebooks/bronze_ingest.py
base_parameters:
catalog: ${var.catalog}
bronze_schema: ${var.bronze_schema}
- task_key: silver_transformation
depends_on:
- task_key: bronze_ingestion
notebook_task:
notebook_path: ../src/medallion/notebooks/silver_transform.py
base_parameters:
catalog: ${var.catalog}
bronze_schema: ${var.bronze_schema}
silver_schema: ${var.silver_schema}
- task_key: gold_aggregation
depends_on:
- task_key: silver_transformation
notebook_task:
notebook_path: ../src/medallion/notebooks/gold_aggregate.py
base_parameters:
catalog: ${var.catalog}
silver_schema: ${var.silver_schema}
gold_schema: ${var.gold_schema}
schedule:
quartz_cron_expression: "0 0 * * * ?"
timezone_id: "UTC"
email_notifications:
on_failure:
- ${workspace.current_user.userName}
Validate and deploy:
databricks bundle validate -t dev
databricks bundle deploy -t dev
Data Engineering Best Practices
1. Idempotent Operations
Use merge (upsert) pattern for idempotency:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, target_table)
deltaTable.alias("target").merge(
source_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \\
.whenNotMatchedInsertAll() \\
.execute()
2. Data Quality Checks
Add assertions at each layer:
assert df.filter(F.col("id").isNull()).count() == 0, "Null IDs found"
dup_count = df.groupBy("id").count().filter(F.col("count") > 1).count()
assert dup_count == 0, f"Found {dup_count} duplicates"
assert df.filter(F.col("amount") < 0).count() == 0, "Negative amounts found"
3. Delta Lake Optimization
Optimize tables after writes:
spark.sql(f"OPTIMIZE {table_name}")
spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
spark.sql(f"OPTIMIZE {table_name} ZORDER BY (date_column, id_column)")
4. Incremental Processing
Process only new data:
new_files_df = spark.read.json("/mnt/source/*.json") \\
.filter(F.col("ingestion_date") == current_date)
bronze_incremental = spark.read.table("bronze.events") \\
.filter(F.col("ingestion_date") == current_date)
5. Schema Evolution
Handle schema changes gracefully:
silver_df.write \\
.format("delta") \\
.mode("append") \\
.option("mergeSchema", "true") \\
.saveAsTable(silver_table)
Common Patterns
Pattern: SCD Type 2 (Slowly Changing Dimension)
from pyspark.sql import functions as F
scd2_df = (
df
.withColumn("effective_start_date", F.current_date())
.withColumn("effective_end_date", F.lit("9999-12-31").cast("date"))
.withColumn("is_current", F.lit(True))
)
Pattern: Late-Arriving Data
deltaTable.alias("target").merge(
late_data.alias("source"),
"target.id = source.id AND target.date = source.date"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={"*"}
).whenNotMatchedInsertAll() \\
.execute()
Pattern: Data Lineage Tracking
df_with_lineage = (
df
.withColumn("source_system", F.lit("sales_db"))
.withColumn("pipeline_run_id", F.lit(dbutils.widgets.get("run_id")))
.withColumn("processed_timestamp", F.current_timestamp())
)
Integration with Other Skills
Uses
databricks-unity-catalog - Creates medallion schemas (bronze, silver, gold)
databricks-testing - Tests each layer's transformation logic
databricks-bundle-deploy - Packages and deploys pipeline
Workflow Summary
- UC skill → Create bronze, silver, gold schemas
- Testing skill → Test bronze ingestion
- Testing skill → Test silver cleaning
- Testing skill → Test gold aggregations
- Bundle skill → Package medallion pipeline
- Bundle skill → Deploy to environments
Troubleshooting
Issue: Small Files Problem
Symptom: Slow queries, many small files
Solution:
spark.sql(f"OPTIMIZE {table_name}")
spark.conf.set("spark.databricks.delta.targetFileSize", "128mb")
Issue: Schema Mismatch
Symptom: "Schema mismatch" error on append
Solution:
df.write.option("mergeSchema", "true").saveAsTable(table)
df.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable(table)
Issue: Duplicate Records
Symptom: Same record appears multiple times
Solution:
from pyspark.sql.window import Window
dedup_df = df.withColumn(
"row_num",
F.row_number().over(Window.partitionBy("id").orderBy(F.col("timestamp").desc()))
).filter(F.col("row_num") == 1).drop("row_num")
Summary
This skill builds production data engineering pipelines:
- Bronze: Raw data ingestion with minimal transformation
- Silver: Cleaned, validated, deduplicated data
- Gold: Business-ready aggregates and metrics
- Quality: Data validation at each layer
- Optimization: Delta Lake compaction and Z-ordering
- Idempotency: Merge patterns for reliable pipelines
Use this skill to build scalable, production-grade data pipelines following medallion architecture best practices.