
Spark Optimization
Tune PySpark jobs with partitioning, join strategies, and I/O patterns so solo-built data pipelines run faster and cheaper on Spark clusters.
Overview
spark-optimization is an agent skill for the Build phase that teaches PySpark partitioning, join, and storage patterns to speed up and right-size distributed data jobs.
Install
npx skills add https://github.com/wshobson/agents --skill spark-optimizationWhat is this skill?
- Calculates optimal partition counts targeting 128MB–256MB shards with repartition vs coalesce guidance
- Documents broadcast joins for sides under ~10MB and sort-merge patterns for large-table joins
- Shows partition-pruned reads and partitionBy writes for downstream query efficiency
- PySpark code patterns for S3/Parquet workflows typical in solo data products
- Recommends optimal partition sizes of 128MB–256MB when sizing partitions
- Broadcast join guidance for one side under roughly 10MB (configurable threshold)
Adoption & trust: 6.9k installs on skills.sh; 36.5k GitHub stars; 3/3 security scanners passed (skills.sh audits).
What problem does it solve?
Your Spark job shuffles too much, OOMs, or scans whole datasets because partitions and joins were chosen by default settings.
Who is it for?
Indie builders shipping analytics backends, ETL to S3, or ML feature pipelines on Spark who need actionable PySpark snippets.
Skip if: Teams only running tiny local pandas workloads or products with no batch data layer.
When should I use this skill?
A Spark or PySpark job is slow, memory-bound, or shuffle-heavy and you need partitioning and join patterns.
What do I get? / Deliverables
You leave with copy-paste partitioning and join patterns sized to your data volume and a write layout that prunes scans on future queries.
- Partition count and repartition/coalesce plan for the dataset size
- Join strategy choices (broadcast vs sort-merge) with code snippets
- Partitioned Parquet write layout for future predicate pushdown
Recommended Skills
Journey fit
Spark optimization is applied while designing and implementing batch or ETL backends before production scale-up. Partition sizing, broadcast joins, and predicate pushdown are backend data-pipeline concerns, not launch or growth workflows.
How it compares
Use instead of guessing spark.sql.shuffle.partitions or blindly increasing cluster size without join or partition strategy.
Common Questions / FAQ
Who is spark-optimization for?
Solo and indie developers building or maintaining PySpark pipelines on cloud object storage who can read DataFrame APIs but want proven optimization patterns.
When should I use spark-optimization?
During Build when implementing ETL or analytics backends, before Ship load tests, or in Operate when a production job needs partition or join fixes without a full rewrite.
Is spark-optimization safe to install?
It is documentation-style patterns only; review the Security Audits panel on this Prism page before installing any skill from the repo.
SKILL.md
READMESKILL.md - Spark Optimization
# spark-optimization — detailed patterns and worked examples ## Patterns ### Pattern 1: Optimal Partitioning ```python # Calculate optimal partition count def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int: """ Optimal partition size: 128MB - 256MB Too few: Under-utilization, memory pressure Too many: Task scheduling overhead """ return max(int(data_size_gb * 1024 / partition_size_mb), 1) # Repartition for even distribution df_repartitioned = df.repartition(200, "partition_key") # Coalesce to reduce partitions (no shuffle) df_coalesced = df.coalesce(100) # Partition pruning with predicate pushdown df = (spark.read.parquet("s3://bucket/data/") .filter(F.col("date") == "2024-01-01")) # Spark pushes this down # Write with partitioning for future queries (df.write .partitionBy("year", "month", "day") .mode("overwrite") .parquet("s3://bucket/partitioned_output/")) ``` ### Pattern 2: Join Optimization ```python from pyspark.sql import functions as F from pyspark.sql.types import * # 1. Broadcast Join - Small table joins # Best when: One side < 10MB (configurable) small_df = spark.read.parquet("s3://bucket/small_table/") # < 10MB large_df = spark.read.parquet("s3://bucket/large_table/") # TBs # Explicit broadcast hint result = large_df.join( F.broadcast(small_df), on="key", how="left" ) # 2. Sort-Merge Join - Default for large tables # Requires shuffle, but handles any size result = large_df1.join(large_df2, on="key", how="inner") # 3. Bucket Join - Pre-sorted, no shuffle at join time # Write bucketed tables (df.write .bucketBy(200, "customer_id") .sortBy("customer_id") .mode("overwrite") .saveAsTable("bucketed_orders")) # Join bucketed tables (no shuffle!) orders = spark.table("bucketed_orders") customers = spark.table("bucketed_customers") # Same bucket count result = orders.join(customers, on="customer_id") # 4. Skew Join Handling # Enable AQE skew join optimization spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB") # Manual salting for severe skew def salt_join(df_skewed, df_other, key_col, num_salts=10): """Add salt to distribute skewed keys""" # Add salt to skewed side df_salted = df_skewed.withColumn( "salt", (F.rand() * num_salts).cast("int") ).withColumn( "salted_key", F.concat(F.col(key_col), F.lit("_"), F.col("salt")) ) # Explode other side with all salts df_exploded = df_other.crossJoin( spark.range(num_salts).withColumnRenamed("id", "salt") ).withColumn( "salted_key", F.concat(F.col(key_col), F.lit("_"), F.col("salt")) ) # Join on salted key return df_salted.join(df_exploded, on="salted_key", how="inner") ``` ### Pattern 3: Caching and Persistence ```python from pyspark import StorageLevel # Cache when reusing DataFrame multiple times df = spark.read.parquet("s3://bucket/data/") df_filtered = df.filter(F.col("status") == "active") # Cache in memory (MEMORY_AND_DISK is default) df_filtered.cache() # Or with specific storage level df_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER) # Force materialization df_filtered.count() # Use in multiple actions agg1 = df_filtered.groupBy("category").count() agg2 = df_filtered.groupBy("region").sum("amount") # Unpersist when done df_filtered.unpersist() # Storage levels explained: # MEMORY_ONLY - Fast, but may not fit # MEMORY_AND_DISK - Spills to disk if needed (recommended) # MEMORY_ONLY_SER - Serialized, less memory, more CPU # DISK_ONLY - When memory is tight # OFF_HEAP - Tungsten off-heap memory # Checkpoint for complex lineage spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/") df_complex = (df .join(other_df, "key") .groupBy("category") .agg(F.sum("amount")