| name | spark-optimization |
| description | Optimize Apache Spark jobs with partitioning, caching, shuffle optimization, and memory tuning. Use when improving Spark performance, debugging slow jobs, or scaling data processing pipelines. |
| risk | unknown |
| source | community |
| date_added | 2026-02-27 |
Apache Spark Optimization
Production patterns for optimizing Apache Spark jobs including partitioning strategies, memory management, shuffle optimization, and performance tuning.
Do not use this skill when
- The task is unrelated to apache spark optimization
- You need a different domain or tool outside this scope
Instructions
- Clarify goals, constraints, and required inputs.
- Apply relevant best practices and validate outcomes.
- Provide actionable steps and verification.
- If detailed examples are required, open
resources/implementation-playbook.md.
Use this skill when
- Optimizing slow Spark jobs
- Tuning memory and executor configuration
- Implementing efficient partitioning strategies
- Debugging Spark performance issues
- Scaling Spark pipelines for large datasets
- Reducing shuffle and data skew
Core Concepts
1. Spark Execution Model
Driver Program
↓
Job (triggered by action)
↓
Stages (separated by shuffles)
↓
Tasks (one per partition)
2. Key Performance Factors
| Factor | Impact | Solution |
|---|
| Shuffle | Network I/O, disk I/O | Minimize wide transformations |
| Data Skew | Uneven task duration | Salting, broadcast joins |
| Serialization | CPU overhead | Use Kryo, columnar formats |
| Memory | GC pressure, spills | Tune executor memory |
| Partitions | Parallelism | Right-size partitions |
Quick Start
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("OptimizedJob")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate())
df = (spark.read
.format("parquet")
.option("mergeSchema", "false")
.load("s3://bucket/data/"))
result = (df
.filter(F.col("date") >= "2024-01-01")
.select("id", "amount", "category")
.groupBy("category")
.agg(F.sum("amount").alias("total")))
result.write.mode("overwrite").parquet("s3://bucket/output/")
Patterns
Pattern 1: Optimal Partitioning
def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int:
"""
Optimal partition size: 128MB - 256MB
Too few: Under-utilization, memory pressure
Too many: Task scheduling overhead
"""
return max(int(data_size_gb * 1024 / partition_size_mb), 1)
df_repartitioned = df.repartition(200, "partition_key")
df_coalesced = df.coalesce(100)
df = (spark.read.parquet("s3://bucket/data/")
.filter(F.col("date") == "2024-01-01"))
(df.write
.partitionBy("year", "month", "day")
.mode("overwrite")
.parquet("s3://bucket/partitioned_output/"))
Pattern 2: Join Optimization
from pyspark.sql import functions as F
from pyspark.sql.types import *
small_df = spark.read.parquet("s3://bucket/small_table/")
large_df = spark.read.parquet("s3://bucket/large_table/")
result = large_df.join(
F.broadcast(small_df),
on="key",
how="left"
)
result = large_df1.join(large_df2, on="key", how="inner")
(df.write
.bucketBy(200, "customer_id")
.sortBy("customer_id")
.mode("overwrite")
.saveAsTable("bucketed_orders"))
orders = spark.table("bucketed_orders")
customers = spark.table("bucketed_customers")
result = orders.join(customers, on="customer_id")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
def salt_join(df_skewed, df_other, key_col, num_salts=10):
"""Add salt to distribute skewed keys"""
df_salted = df_skewed.withColumn(
"salt",
(F.rand() * num_salts).cast("int")
).withColumn(
"salted_key",
F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
)
df_exploded = df_other.crossJoin(
spark.range(num_salts).withColumnRenamed("id", "salt")
).withColumn(
"salted_key",
F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
)
return df_salted.join(df_exploded, on="salted_key", how="inner")
Pattern 3: Caching and Persistence
from pyspark import StorageLevel
df = spark.read.parquet("s3://bucket/data/")
df_filtered = df.filter(F.col("status") == "active")
df_filtered.cache()
df_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER)
df_filtered.count()
agg1 = df_filtered.groupBy("category").count()
agg2 = df_filtered.groupBy("region").sum("amount")
df_filtered.unpersist()
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
df_complex = (df
.join(other_df, "key")
.groupBy("category")
.agg(F.sum("amount")))
df_complex.checkpoint()
Pattern 4: Memory Tuning
spark = (SparkSession.builder
.config("spark.executor.memory", "8g")
.config("spark.executor.memoryOverhead", "2g")
.config("spark.memory.fraction", "0.6")
.config("spark.memory.storageFraction", "0.5")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.sql.autoBroadcastJoinThreshold", "50MB")
.config("spark.sql.files.maxPartitionBytes", "128MB")
.getOrCreate())
def print_memory_usage(spark):
"""Print current memory usage"""
sc = spark.sparkContext
for executor in sc._jsc.sc().getExecutorMemoryStatus().keySet().toArray():
mem_status = sc._jsc.sc().getExecutorMemoryStatus().get(executor)
total = mem_status._1() / (1024**3)
free = mem_status._2() / (1024**3)
print(f"{executor}: {total:.2f}GB total, {free:.2f}GB free")
Pattern 5: Shuffle Optimization
spark.conf.set("spark.sql.shuffle.partitions", "auto")
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
df_optimized = (df
.groupBy("key", "partition_col")
.agg(F.sum("value").alias("partial_sum"))
.groupBy("key")
.agg(F.sum("partial_sum").alias("total")))
distinct_count = df.select("category").distinct().count()
approx_count = df.select(F.approx_count_distinct("category")).collect()[0][0]
df_reduced = df.coalesce(10)
spark.conf.set("spark.io.compression.codec", "lz4")
Pattern 6: Data Format Optimization
(df.write
.option("compression", "snappy")
.option("parquet.block.size", 128 * 1024 * 1024)
.parquet("s3://bucket/output/"))
df = (spark.read.parquet("s3://bucket/data/")
.select("id", "amount", "date"))
df = (spark.read.parquet("s3://bucket/partitioned/year=2024/")
.filter(F.col("status") == "active"))
(df.write
.format("delta")
.option("optimizeWrite", "true")
.option("autoCompact", "true")
.mode("overwrite")
.save("s3://bucket/delta_table/"))
spark.sql("""
OPTIMIZE delta.`s3://bucket/delta_table/`
ZORDER BY (customer_id, date)
""")
Pattern 7: Monitoring and Debugging
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df.explain(mode="extended")
df.explain(mode="cost")
def analyze_stage_metrics(spark):
"""Analyze recent stage metrics"""
status_tracker = spark.sparkContext.statusTracker()
for stage_id in status_tracker.getActiveStageIds():
stage_info = status_tracker.getStageInfo(stage_id)
print(f"Stage {stage_id}:")
print(f" Tasks: {stage_info.numTasks}")
print(f" Completed: {stage_info.numCompletedTasks}")
print(f" Failed: {stage_info.numFailedTasks}")
def check_partition_skew(df):
"""Check for partition skew"""
partition_counts = (df
.withColumn("partition_id", F.spark_partition_id())
.groupBy("partition_id")
.count()
.orderBy(F.desc("count")))
partition_counts.show(20)
stats = partition_counts.select(
F.min("count").alias("min"),
F.max("count").alias("max"),
F.avg("count").alias("avg"),
F.stddev("count").alias("stddev")
).collect()[0]
skew_ratio = stats["max"] / stats["avg"]
print(f"Skew ratio: {skew_ratio:.2f}x (>2x indicates skew)")
Configuration Cheat Sheet
spark_configs = {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.executor.memory": "8g",
"spark.executor.memoryOverhead": "2g",
"spark.memory.fraction": "0.6",
"spark.memory.storageFraction": "0.5",
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.io.compression.codec": "lz4",
"spark.shuffle.compress": "true",
"spark.sql.autoBroadcastJoinThreshold": "50MB",
"spark.sql.files.maxPartitionBytes": "128MB",
"spark.sql.files.openCostInBytes": "4MB",
}
Best Practices
Do's
- Enable AQE - Adaptive query execution handles many issues
- Use Parquet/Delta - Columnar formats with compression
- Broadcast small tables - Avoid shuffle for small joins
- Monitor Spark UI - Check for skew, spills, GC
- Right-size partitions - 128MB - 256MB per partition
Don'ts
- Don't collect large data - Keep data distributed
- Don't use UDFs unnecessarily - Use built-in functions
- Don't over-cache - Memory is limited
- Don't ignore data skew - It dominates job time
- Don't use
.count() for existence - Use .take(1) or .isEmpty()
Resources
Limitations
- Use this skill only when the task clearly matches the scope described above.
- Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
- Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.