Top 50 PySpark Interview Questions 2026
PySpark remains the most in-demand skill for data engineering roles in 2026. Whether you are interviewing at Amazon, Meta, Databricks or a growing startup, you will face PySpark questions. This guide covers 50 questions with detailed explanations and working code — not just one-liner bullet points.
Q1. What is PySpark and how does it differ from Apache Spark?
Apache Spark is a distributed computing engine written in Scala that runs on the JVM. PySpark is the official Python API for Spark — it lets you write Spark jobs in Python while the actual execution still happens on the JVM under the hood. When you call a PySpark transformation, your Python code is translated into JVM operations via the Py4J bridge library.
The practical implication: Python UDFs are slow because every row must cross the Python-JVM boundary. But built-in PySpark functions (pyspark.sql.functions) run entirely on the JVM and are just as fast as Scala. This is a very common interview follow-up question — knowing why Python UDFs are slow demonstrates real understanding.
Q2. What is an RDD and when would you still use it?
An RDD (Resilient Distributed Dataset) is the foundational data structure in Spark — an immutable, distributed collection of objects partitioned across the cluster. It gives you full control: you define exactly how data is partitioned and transformed at the record level. The downside is that the Catalyst optimizer cannot optimise RDD code, so it is almost always slower than the equivalent DataFrame code.
In 2026, you should almost never write new code using the RDD API. The only valid reasons to reach for RDDs are: (1) fine-grained control over partitioning that DataFrames cannot express, (2) working with unstructured data like raw text or binary files, or (3) interfacing with a legacy codebase. Interviewers want to hear that you know RDDs exist and understand why DataFrames replaced them.
Q3. What is lazy evaluation and why does Spark use it?
When you write transformations in PySpark (filter, select, groupBy, join), Spark does not execute them immediately. Instead it builds a logical plan — a DAG (Directed Acyclic Graph) of operations. Execution only happens when you call an action such as count(), collect(), show(), or write(). This is called lazy evaluation.
The reason Spark uses lazy evaluation is optimisation. Because Spark sees your entire computation plan before running any of it, the Catalyst optimizer can: reorder operations (push filters before joins), eliminate redundant steps, merge multiple transformations into a single stage, and choose the most efficient physical execution plan. If Spark executed eagerly like pandas, each line would run immediately with no chance to optimise across operations.
# Nothing executes here — Spark just builds the plan
df = spark.read.parquet("s3://bucket/events/")
df = df.filter(df.country == "US") # lazy
df = df.groupBy("user_id").count() # lazy
# THIS triggers execution — Spark runs the entire plan at once
df.show() # ← action
# Check the plan without executing
df.explain(extended=True)Q4. What is the difference between a transformation and an action?
Transformations are lazy operations that return a new DataFrame — they only add to the logical plan without running anything. Examples: select(), filter(), groupBy(), join(), withColumn(), orderBy(), distinct(), union().
Actions trigger the actual execution of the entire accumulated plan. Examples: show(), count(), collect(), first(), take(n), write(), toPandas(). Every time you call an action, Spark compiles the logical plan into physical stages and submits jobs to the cluster. A common performance mistake is calling count() inside a loop — each call triggers a full job.
Q5. What is the driver and what are executors?
The driver is the process running your main PySpark program. It holds the SparkContext/SparkSession, builds the execution plan, and coordinates the job. It runs on the master node (or your local machine in local mode). The driver is single-threaded with respect to your job — if the driver crashes, the job fails.
Executors are JVM processes running on worker nodes that actually perform the computation. Each executor has a fixed amount of memory and a fixed number of cores (slots). Tasks are distributed across executor cores. A common interview follow-up: "What happens if the driver OOMs vs an executor OOMs?" — Driver OOM crashes the entire job; executor OOM causes the stage to retry on a different executor (up to spark.task.maxFailures times).
Q6. What is a partition in PySpark?
A partition is a logical chunk of your DataFrame that lives on a single executor. Spark processes one partition per task, in parallel. The number of partitions determines parallelism — too few partitions means unused cores; too many means scheduling overhead and small tasks.
There are two important partition counts to know: the read-time partition count (controlled by spark.sql.files.maxPartitionBytes, default 128MB per partition) and the shuffle partition count (controlled by spark.sql.shuffle.partitions, default 200). The shuffle partition count is the one that needs tuning most often — 200 is too low for large datasets and too high for small ones.
# Check partition count
print(df.rdd.getNumPartitions())
# Increase partitions (triggers shuffle)
df = df.repartition(400)
# Decrease partitions without a full shuffle
df = df.coalesce(50)
# Repartition on a specific column (for downstream joins/writes)
df = df.repartition(200, "customer_id")
# Tune shuffle partitions for current cluster
spark.conf.set("spark.sql.shuffle.partitions", "400")Q7. What is the difference between narrow and wide transformations?
A narrow transformation operates on data within a single partition — no data needs to move between executors. Examples: map(), filter(), select(), withColumn(). These are fast because each partition is processed independently.
A wide transformation requires data from multiple partitions to compute the result — this triggers a shuffle, where data is written to disk, transferred over the network, and read by new tasks. Examples: groupBy(), join(), distinct(), repartition(), orderBy(). Shuffles are the primary performance bottleneck in Spark jobs. Minimising unnecessary wide transformations is the core of PySpark performance optimisation.
Q8. What is a shuffle and how do you minimise it?
A shuffle is the process of redistributing data across partitions — typically because a wide transformation requires all records with the same key to land on the same partition. During a shuffle, Spark writes intermediate data to local disk (the shuffle write), transfers it across the network, and reads it again (the shuffle read). This is expensive in time, disk I/O, and network bandwidth.
Ways to minimise shuffle: (1) Broadcast join small tables instead of sort-merge joining, (2) Filter data as early as possible to reduce shuffle volume, (3) Avoid redundant repartition() calls, (4) Use reduceByKey instead of groupByKey on RDDs (partial aggregation before shuffle), (5) Enable AQE to dynamically coalesce post-shuffle partitions, (6) Pre-partition data on the join key at write time so future joins on that key are shuffle-free.
Q9. What is the Catalyst optimizer?
Catalyst is Spark SQL's query optimiser — the engine that makes DataFrames faster than hand-written RDD code. It transforms your logical query plan through four phases: Analysis (resolve column names and types), Logical Optimisation (rule-based: predicate pushdown, constant folding, null propagation), Physical Planning (generate multiple physical plans and pick the cheapest using statistics), and Code Generation (compile the chosen plan to JVM bytecode using Janino).
The practical takeaway: always prefer built-in SQL functions over Python UDFs because Catalyst can optimise SQL functions end-to-end. The moment you introduce a Python UDF, Catalyst treats it as a black box and cannot optimise through it.
Q10. What is SparkSession vs SparkContext?
SparkContext was the original entry point in Spark 1.x — one context per JVM, used to create RDDs. SparkSession was introduced in Spark 2.0 as a unified entry point that subsumes SparkContext, SQLContext, and HiveContext. You access SparkContext through spark.sparkContext. In modern Spark (2.x+), always use SparkSession. In PySpark on Databricks or EMR, the session is pre-created as the spark variable.
from pyspark.sql import SparkSession
spark = SparkSession.builder .appName("MyPipeline") .config("spark.sql.shuffle.partitions", "200") .config("spark.sql.adaptive.enabled", "true") .getOrCreate()
# Access the underlying SparkContext if needed
sc = spark.sparkContextQ11. What join types does PySpark support?
PySpark supports: inner (only matching rows), left / left_outer (all left rows, matched right), right / right_outer (all right rows, matched left), full / full_outer (all rows from both sides), cross (cartesian product — every combination), left_semi (left rows that have a match in right — like EXISTS), left_anti (left rows that have NO match in right — like NOT EXISTS).
The semi and anti joins are underused but extremely valuable. A left_semi join is more efficient than an inner join when you only need columns from the left table — it avoids duplicating left rows when there are multiple matches on the right.
# Standard inner join
result = orders.join(customers, on="customer_id", how="inner")
# Left semi — keep orders that have a matching customer (no customer columns)
valid_orders = orders.join(customers, on="customer_id", how="left_semi")
# Left anti — find orphaned orders with no customer record
orphaned = orders.join(customers, on="customer_id", how="left_anti")
# Cross join (use with care — produces M×N rows)
combos = products.crossJoin(regions)Q12. What is a broadcast join and when should you use it?
A broadcast join sends a complete copy of the smaller DataFrame to every executor, so each executor can perform the join locally without any shuffle. This is the single most impactful optimisation for joins involving one small and one large table.
Spark auto-broadcasts tables below spark.sql.autoBroadcastJoinThreshold (default 10MB). For slightly larger tables you know are small enough, force it manually with the broadcast() hint. The rule of thumb: if one side fits comfortably in executor memory (typically anything under a few hundred MB), broadcast it.
from pyspark.sql.functions import broadcast
# Auto-broadcast — Spark decides based on table statistics
result = large_events.join(small_countries, on="country_code")
# Manual broadcast hint — use when you know the table is small
result = large_events.join(broadcast(small_countries), on="country_code")
# Verify the join strategy in the physical plan
result.explain()
# Look for: BroadcastHashJoin (good) vs SortMergeJoin (shuffle)
# Disable auto-broadcast if you want full control
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")Q13. What is data skew and how do you fix it?
Data skew occurs when the data is unevenly distributed across partitions — some partitions have millions of rows while others have a few hundred. The symptom in the Spark UI: 99 out of 100 tasks finish in 30 seconds, but 1 task takes 45 minutes. The entire stage is blocked waiting for that one slow task.
The most common cause is joining or grouping on a key with highly non-uniform distribution — for example, a small number of "power users" who have far more events than average. The fix depends on the cause: for join skew, use salting (add a random prefix to the skewed key and explode the small table to match). For AQE-enabled jobs, enable spark.sql.adaptive.skewJoin.enabled which handles this automatically.
from pyspark.sql import functions as F
# Detect skew — check row distribution across partitions
df.groupBy(F.spark_partition_id().alias("partition_id")) .count() .orderBy(F.desc("count")) .show(20)
# Fix skew with salting
SALT_FACTOR = 10
# Add random salt 0–9 to the large (skewed) table
large_df = large_df.withColumn(
"salted_key",
F.concat(F.col("skewed_key"), F.lit("_"),
(F.rand() * SALT_FACTOR).cast("int").cast("string"))
)
# Explode the small table to have all 10 salt values per key
from pyspark.sql.functions import explode, array, lit
small_df = small_df.withColumn(
"salt_values", array([lit(i) for i in range(SALT_FACTOR)])
).withColumn("salt", explode("salt_values")) .withColumn("salted_key",
F.concat(F.col("key"), F.lit("_"), F.col("salt").cast("string")))
result = large_df.join(small_df, on="salted_key").drop("salt", "salted_key", "salt_values")Q14. What is the difference between cache() and persist()?
Both store a DataFrame in memory so subsequent actions do not recompute it from scratch. cache() is simply a shorthand for persist(StorageLevel.MEMORY_AND_DISK) — it stores data in memory and spills to disk if memory is insufficient. persist() lets you explicitly choose the storage level.
Key storage levels: MEMORY_ONLY (fastest, but data is lost if executor OOMs), MEMORY_AND_DISK (safe default — spills to disk), DISK_ONLY (useful for very large DataFrames you only access occasionally), OFF_HEAP (stores data in off-heap memory, bypasses GC pressure). Always call unpersist() when you no longer need cached data to free executor memory.
from pyspark import StorageLevel
# cache() — equivalent to MEMORY_AND_DISK
df.cache()
# persist() — explicit storage level
df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.DISK_ONLY)
# Always trigger an action to actually populate the cache
df.count()
# Release when done
df.unpersist()Q15. How do window functions work in PySpark?
A window function computes a value for each row based on a "window" of related rows, without collapsing rows the way groupBy().agg() does. You define the window using WindowSpec — specifying how to group rows (partitionBy) and how to order them (orderBy). The function is then applied over each window.
The most commonly tested patterns: ranking (row_number, rank, dense_rank to get top-N per group), running totals (sum() over an unbounded preceding frame), and period-over-period comparison (lag() to compare current row with the previous row). Master these three patterns and you can solve 90% of window function interview questions.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Pattern 1: Rank within group — top 3 earners per department
w_rank = Window.partitionBy("dept").orderBy(F.desc("salary"))
df.withColumn("rank", F.dense_rank().over(w_rank)) .filter(F.col("rank") <= 3)
# Pattern 2: Running total
w_running = Window.partitionBy("user_id") .orderBy("order_date") .rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("cumulative_spend", F.sum("amount").over(w_running))
# Pattern 3: Period-over-period change (week-over-week revenue)
w_lag = Window.partitionBy("product_id").orderBy("week")
df.withColumn("prev_week_rev", F.lag("revenue", 1).over(w_lag)) .withColumn("wow_growth_pct",
((F.col("revenue") - F.col("prev_week_rev")) / F.col("prev_week_rev") * 100).round(2)
)Q16. What is the difference between row_number(), rank(), and dense_rank()?
All three assign numbers to rows ordered within a partition, but they handle ties differently. row_number() always assigns unique numbers regardless of ties — two rows with the same salary might get ranks 3 and 4 arbitrarily. rank() gives tied rows the same number but skips the next rank — ties at position 2 both get rank 2, next row gets rank 4 (gap). dense_rank() gives tied rows the same number with no gaps — ties at position 2 both get 2, next row gets 3.
# Given salaries: 90k, 85k, 85k, 70k
# row_number: 1, 2, 3, 4 (always unique)
# rank: 1, 2, 2, 4 (gap after tie)
# dense_rank: 1, 2, 2, 3 (no gap)
w = Window.partitionBy("dept").orderBy(F.desc("salary"))
df.select(
"name", "dept", "salary",
F.row_number().over(w).alias("row_num"),
F.rank().over(w).alias("rank"),
F.dense_rank().over(w).alias("dense_rank")
).show()
# Interview tip: use dense_rank() when you want "top N unique salary levels"
# Use row_number() when you need exactly one row per group (deduplication)Q17. Explain lag() and lead() with a real use case
lag(column, n) returns the value of a column from the row n positions before the current row within the window. lead(column, n) returns the value from n rows ahead. Both return null for rows where the offset goes out of bounds — you can provide a default value as the third argument.
Real use case: calculate day-over-day revenue change for each product. You order by date within each product partition, then use lag() to pull the previous day's revenue into the current row. This pattern comes up in virtually every analytics pipeline for trend detection, churn analysis, and A/B test monitoring.
w = Window.partitionBy("product_id").orderBy("date")
df.withColumn("prev_day_revenue", F.lag("revenue", 1, 0).over(w)) .withColumn("next_day_revenue", F.lead("revenue", 1, 0).over(w)) .withColumn("daily_change", F.col("revenue") - F.col("prev_day_revenue")) .withColumn("is_growing", F.col("daily_change") > 0)
# Use case: find sessions — new session if gap > 30 min
w_session = Window.partitionBy("user_id").orderBy("event_time")
df.withColumn("prev_time", F.lag("event_time").over(w_session)) .withColumn("gap_minutes",
(F.col("event_time").cast("long") - F.col("prev_time").cast("long")) / 60
) .withColumn("is_new_session",
(F.col("gap_minutes") > 30) | F.col("gap_minutes").isNull()
)Q18. What is Adaptive Query Execution (AQE)?
AQE is a Spark 3.0+ feature that re-optimises the query plan at runtime using actual statistics collected after each shuffle stage completes. Before Spark 3.0, the entire plan was fixed at compile time based on estimated statistics that were often wrong. AQE changes the plan dynamically as real data is processed.
AQE provides three major benefits automatically: (1) Dynamic partition coalescing — after a shuffle, small post-shuffle partitions are merged, preventing thousands of tiny tasks. (2) Dynamic join strategy switching — if a table that looked large turns out to be small after filtering, AQE switches from sort-merge join to broadcast join on the fly. (3) Skew join optimisation — AQE detects skewed partitions and splits them into sub-tasks to prevent stragglers.
# Enable AQE (default ON in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Tune the coalescing threshold — partitions smaller than this get merged
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# Skew join — partition is skewed if larger than this multiple of median
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
# Verify AQE is being used — look for "AdaptiveSparkPlan" in explain output
df.groupBy("country").count().explain()Q19. What is the difference between coalesce() and repartition()?
Both change the number of partitions, but they work very differently. repartition(n) performs a full shuffle — it can both increase and decrease partition count, and the data is evenly distributed across all n partitions. coalesce(n) only reduces partitions by merging existing ones without a shuffle — it is fast but can produce uneven partition sizes because it just combines adjacent partitions.
Use repartition() before a large join or write when you need even distribution. Use coalesce() at the end of a pipeline to reduce the number of output files without paying the shuffle cost. Never use coalesce() to increase partitions — it silently does nothing (you need repartition for that).
# Before a large join — ensure even distribution
df = df.repartition(400, "customer_id") # shuffle on key for join optimisation
# At write time — reduce output files cheaply
df.coalesce(10).write.parquet("output/")
# repartition on a column — useful for downstream operations on that key
df = df.repartition(200, "date") # all rows for same date → same partitions
# Verify
print(f"Partitions: {df.rdd.getNumPartitions()}")Q20. How do you handle null values in PySpark?
Nulls are pervasive in real data and a common source of bugs. PySpark has rich null-handling support. Key functions: isNull() / isNotNull() for filtering, na.fill() and na.drop() on the DataFrame, coalesce() to return the first non-null from a list of columns, and nullif() to convert a specific value to null.
Critical interview point: nulls in join keys. In PySpark, null != null in join conditions — two rows with null in the join key will NOT match in an inner join. This is standard SQL behaviour but trips up many candidates. If you want null keys to match, you need a special join condition using <=> (null-safe equality) or filter nulls before joining.
# Filter rows where column is null / not null
df.filter(F.col("email").isNull())
df.filter(F.col("email").isNotNull())
# Fill nulls
df.na.fill({"revenue": 0, "country": "UNKNOWN"})
# Drop rows with any null in specific columns
df.na.drop(subset=["user_id", "event_date"])
# coalesce — first non-null value
df.withColumn("name", F.coalesce(F.col("display_name"), F.col("username"), F.lit("Anonymous")))
# Null-safe equality in joins (null matches null)
df1.join(df2, df1["key"].eqNullSafe(df2["key"]))Q21. What is Parquet and why is it the preferred format for PySpark?
Parquet is a columnar binary file format designed for analytical workloads. In a row-based format like CSV, all columns for a row are stored together. In Parquet, all values for a single column are stored together. This means a query that only needs 3 out of 50 columns reads roughly 6% of the data — a massive I/O saving.
Parquet also compresses each column independently using the most efficient algorithm for that column's data type — dictionary encoding for low-cardinality string columns (like country codes), delta encoding for sorted integers (like timestamps), and run-length encoding for boolean columns. In practice, Parquet files are 5–10x smaller than CSV and 3–5x faster to query. Always write your data lake in Parquet (or Delta, which is Parquet underneath).
Q22. What is predicate pushdown in Spark?
Predicate pushdown means Spark moves filter conditions as close to the data source as possible, so the minimum amount of data is read into memory. When reading Parquet, Spark uses the row group statistics stored in each file's footer (min/max values per column per row group) to skip entire 128MB row groups that cannot satisfy the filter. This happens before any data enters Spark memory.
You can verify predicate pushdown is working by calling df.explain() and looking for "PushedFilters" in the Scan node. If your filter column is not in the pushed filters list, Spark is reading all data and filtering after — which is much slower. For full pushdown benefit, filter on columns that are sorted or have good min/max statistics in the Parquet file.
df = spark.read.parquet("s3://bucket/events/") .filter(F.col("country") == "US") .filter(F.col("event_date") == "2026-04-01")
# Verify pushdown — look for "PushedFilters" in the output
df.explain()
# Output should show: PushedFilters: [IsNotNull(country), EqualTo(country,US), ...]
# What kills pushdown: applying a function to the column before filtering
# Bad — function prevents pushdown
df.filter(F.upper(F.col("country")) == "US")
# Good — filter on the raw column
df.filter(F.col("country") == "US")Q23. How do you write partitioned data and why does it matter?
Partitioning physically organises data into subdirectories based on a column's value. When you write with partitionBy("date"), Spark creates a folder structure like date=2026-04-01/, date=2026-04-02/ etc. When a downstream query filters on that column, Spark reads only the matching subdirectory — this is called partition pruning, and it is the most impactful read optimisation for large datasets.
Choose partition columns carefully. Good partition keys have low-to-medium cardinality (date, country, region), are frequently used as filters in downstream queries, and distribute data relatively evenly. Bad choices: high-cardinality columns like user_id (creates millions of tiny folders) or columns that are never filtered on (all data gets read anyway). A common best practice is to partition by date at the top level and optionally a second column like region.
# Write partitioned by date — creates date=YYYY-MM-DD/ subdirectories
df.write .mode("overwrite") .partitionBy("event_date") .parquet("s3://bucket/events/")
# Multi-level partitioning
df.write .mode("overwrite") .partitionBy("event_date", "country") .parquet("s3://bucket/events/")
# Creates: event_date=2026-04-01/country=US/part-00001.parquet
# Reading — Spark automatically prunes partitions
df = spark.read.parquet("s3://bucket/events/") .filter(F.col("event_date") == "2026-04-01")
# Only reads the event_date=2026-04-01/ directory
# Overwrite a single partition without touching others
df_new.write .mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("event_date") .parquet("s3://bucket/events/")Q24. What is the small files problem and how do you fix it?
The small files problem occurs when your data lake accumulates thousands or millions of tiny files (kilobytes rather than the target 128MB–1GB). Every file has overhead: the Spark driver must list and open each file, read its footer metadata, and plan a task for it. With 100,000 small files, the listing and planning overhead alone can take minutes before any data is processed.
Common causes: writing from a streaming job that checkpoints every 30 seconds, over-partitioning (too many partition columns creating too many combinations), or repartitioning to too many partitions before writing. Fixes: use coalesce() before writing to control output file count, run Delta OPTIMIZE to compact existing small files, increase streaming batch intervals, and avoid partitioning on high-cardinality columns.
# Fix at write time — coalesce to control output file count
# 1 partition = 1 output file (aim for ~128MB–512MB per file)
df.coalesce(10).write.mode("overwrite").parquet("s3://bucket/output/")
# Fix existing Delta table — compact small files
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/delta/events")
dt.optimize().executeCompaction()
# Check how many files exist per partition
spark.sql("""
SELECT event_date, COUNT(*) as file_count
FROM delta.`/delta/events`
GROUP BY event_date
ORDER BY file_count DESC
""").show()Q25. What is schema inference and why should you avoid it in production?
When you read a CSV or JSON file without specifying a schema, Spark performs schema inference — it scans the data to guess column names and types. For CSV, it defaults to reading a sample of rows (inferSchema=True). This is convenient for exploration but has two serious problems in production: it is slow (requires a full extra scan of the data) and it is unreliable (it might infer a column as integer when there are rare float values, or miss nulls).
Always define an explicit StructType schema in production pipelines. This makes the schema part of your code (reviewable, versioned), eliminates the inference scan, and gives you precise control over nullability. If an incoming file does not match your expected schema, Spark will fail fast — which is what you want rather than silently loading wrong data.
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, LongType
# Always define schema explicitly in production
schema = StructType([
StructField("user_id", StringType(), nullable=False),
StructField("event_time", TimestampType(), nullable=False),
StructField("revenue", DoubleType(), nullable=True),
StructField("country", StringType(), nullable=True),
StructField("session_id", LongType(), nullable=True),
])
# Read CSV with explicit schema — no inference scan
df = spark.read .schema(schema) .option("header", "true") .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") .csv("s3://bucket/events/")
# Read JSON with explicit schema
df = spark.read.schema(schema).json("s3://bucket/events/")
# Write and read Parquet — schema is embedded in the file, no inference needed
df.write.parquet("s3://bucket/processed/")
df = spark.read.parquet("s3://bucket/processed/") # schema read from footerQ26. How do you read and write Delta Lake tables in PySpark?
Delta Lake uses the same Parquet files under the hood but adds a transaction log directory (_delta_log/) that records every operation. Reading and writing Delta is nearly identical to Parquet — you just specify format("delta") instead of "parquet". The key difference is that Delta guarantees ACID properties: if a write fails halfway through, no partial data is visible to readers.
# Write as Delta
df.write.format("delta").mode("overwrite").save("/delta/orders")
# Write as partitioned Delta
df.write.format("delta") .mode("overwrite") .partitionBy("order_date") .save("/delta/orders")
# Read Delta
df = spark.read.format("delta").load("/delta/orders")
# Read using SQL (if registered as a table)
spark.sql("CREATE TABLE orders USING DELTA LOCATION '/delta/orders'")
df = spark.sql("SELECT * FROM orders WHERE order_date = '2026-04-01'")
# Check table history
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/delta/orders")
dt.history().show()Q27. How do you optimise a slow PySpark job — step by step?
Start with the Spark UI — it is the most important debugging tool. Open the Jobs tab, find the slow stage, and look at the Tasks tab for that stage. If most tasks are fast but one is slow, you have data skew. If all tasks are slow, you have insufficient parallelism, too much data being read, or expensive operations.
The systematic checklist: (1) Check how much data is being read — are filters pushed down? Are you reading all partitions when you only need one? (2) Check for shuffles — look at shuffle read/write bytes in the stage summary. Can any joins be broadcast? (3) Check for spill to disk — if shuffle spill MB is high, executors have insufficient memory. (4) Check for skew — is one task taking 10x longer? Salt the key. (5) Check spark.sql.shuffle.partitions — is it tuned for your cluster size?
# Step 1: Check the physical plan to understand what Spark is doing
df.explain(extended=True)
# Step 2: Check partition distribution — uneven = skew
df.groupBy(F.spark_partition_id()).count().orderBy(F.desc("count")).show(20)
# Step 3: Profile data volume at each stage
print(f"Input rows: {df.count()}")
df_filtered = df.filter(F.col("country") == "US")
print(f"After filter: {df_filtered.count()}")
# Step 4: Tune shuffle partitions for cluster size
# Rule of thumb: 2-3x total executor cores
total_cores = 20 * 8 # 20 executors × 8 cores each = 160 cores
spark.conf.set("spark.sql.shuffle.partitions", str(total_cores * 3)) # 480
# Step 5: Enable AQE for automatic runtime optimisation
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")Q28. What is the difference between select() and withColumn()?
select() returns a new DataFrame with only the columns you specify — it can rename, transform, and reorder columns all at once. withColumn() adds or replaces a single column while keeping all existing columns. Use select() when you want to control the full output schema; use withColumn() when you want to add or update one column without typing out all the others.
A performance note: calling withColumn() in a loop (e.g., to add 50 columns) builds a deep nested plan that can cause analysis overhead. If you need to add many columns at once, use select() with a list of column expressions instead — it produces a single flat plan.
# select — control exact output columns
df.select("user_id", "revenue",
F.col("event_time").alias("ts"),
(F.col("revenue") * 1.1).alias("revenue_with_tax"))
# withColumn — add/replace one column, keep everything else
df.withColumn("revenue_usd", F.col("revenue") / F.col("exchange_rate")) .withColumn("event_date", F.to_date("event_time"))
# Anti-pattern: withColumn in a loop — builds deeply nested plan
# Bad:
for col_name in fifty_columns:
df = df.withColumn(col_name, F.lit(None))
# Good: use select with a list
exprs = [F.col(c) for c in df.columns] + [F.lit(None).alias(c) for c in fifty_columns]
df = df.select(*exprs)Q29. How do you deduplicate rows in PySpark?
PySpark provides two deduplication functions: distinct() removes rows that are completely identical across all columns. dropDuplicates(subset) removes rows that are duplicate on a specified subset of columns, keeping the first occurrence. Neither guarantees which duplicate is kept — if you need to keep the "latest" record by a timestamp, use a window function with row_number() instead.
# Remove fully duplicate rows (all columns identical)
df.distinct()
# Remove duplicates based on key columns only
df.dropDuplicates(["user_id", "event_date"])
# Keep the most recent record per user — deterministic deduplication
from pyspark.sql.window import Window
w = Window.partitionBy("user_id").orderBy(F.desc("updated_at"))
df.withColumn("rn", F.row_number().over(w)) .filter(F.col("rn") == 1) .drop("rn")
# Count duplicates to understand data quality
total = df.count()
unique = df.dropDuplicates(["order_id"]).count()
print(f"Duplicate orders: {total - unique}")Q30. How do you read multiple files and handle schema mismatches across files?
Spark can read multiple files in a single read call by passing a directory path, a glob pattern, or a list of paths. For Parquet/Delta, schema is embedded in each file. For CSV/JSON across multiple files, schemas might differ — columns added over time, types changed. The mergeSchema option handles this by unioning all schemas and filling missing columns with null.
# Read an entire directory
df = spark.read.parquet("s3://bucket/events/")
# Read with glob pattern — only April files
df = spark.read.parquet("s3://bucket/events/date=2026-04-*/")
# Read a list of specific paths
paths = ["s3://bucket/jan.parquet", "s3://bucket/feb.parquet"]
df = spark.read.parquet(*paths)
# Handle schema evolution — columns added over time
df = spark.read .option("mergeSchema", "true") .parquet("s3://bucket/events/")
# Add source file path column — useful for debugging
df = spark.read.parquet("s3://bucket/events/") .withColumn("source_file", F.input_file_name())Q31. Why are Python UDFs slow and what should you use instead?
Python UDFs are slow because of the Python-JVM boundary. When Spark executes a Python UDF, it must: serialise each row from the JVM to Python (using pickle), pass it across a socket to the Python worker process, execute your Python function, serialise the result back, and pass it back to the JVM. This happens row by row and completely bypasses the Catalyst optimizer — Catalyst treats UDFs as black boxes it cannot inspect or optimise.
The fix priority: (1) First check if there is a built-in pyspark.sql.functions equivalent — there almost always is. Built-in functions run entirely on the JVM and Catalyst can optimise through them. (2) If no built-in exists, use a pandas_udf (vectorised UDF) which batches rows using Apache Arrow instead of serialising one at a time. (3) Only use a row-level Python UDF as a last resort for logic that truly cannot be expressed in SQL functions or pandas.
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StringType, DoubleType
import pandas as pd
# Option 1: Built-in function — always prefer this
df.withColumn("country_upper", F.upper(F.col("country"))) # JVM, Catalyst-optimised
# Option 2: pandas_udf — batched via Apache Arrow, 10-100x faster than row UDF
@pandas_udf(StringType())
def clean_email(emails: pd.Series) -> pd.Series:
return emails.str.lower().str.strip()
df.withColumn("email_clean", clean_email(F.col("email")))
# Option 3: Row-level UDF — only as last resort
@udf(returnType=DoubleType())
def complex_formula(a, b, c):
# some business logic impossible in SQL
return (a ** 2 + b) / (c + 1e-9)
df.withColumn("result", complex_formula("col_a", "col_b", "col_c"))Q32. What is a pandas_udf and how does it work internally?
A pandas_udf (also called a vectorised UDF) is a PySpark UDF that operates on pandas Series or DataFrames rather than individual row values. Under the hood, Spark uses Apache Arrow — a columnar in-memory data format — to transfer entire batches of data between the JVM and Python in a single serialisation operation. Your function then processes the batch using native pandas operations (which are themselves vectorised C/NumPy operations).
There are three pandas_udf types: Series to Series (transform one column), Series to Scalar (aggregate a column to a single value, used with groupBy), and Iterator of Series to Iterator of Series (for UDFs that need to load a model once and apply it across many batches — common in ML inference pipelines).
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
from pyspark.sql.types import DoubleType, StringType
# Type 1: Series to Series — element-wise transformation
@pandas_udf(DoubleType())
def normalise(values: pd.Series) -> pd.Series:
return (values - values.min()) / (values.max() - values.min())
df.withColumn("normalised_revenue", normalise(F.col("revenue")))
# Type 2: Iterator of Series — load model ONCE, apply to all batches
from typing import Iterator
@pandas_udf(DoubleType())
def predict_churn(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import joblib
model = joblib.load("/path/to/model.pkl") # loaded once per partition
for series in iterator:
yield pd.Series(model.predict(series.values.reshape(-1, 1)))
df.withColumn("churn_score", predict_churn(F.col("days_since_login")))Q33. How do you perform an aggregation and keep non-aggregated columns?
A common interview trap: you group by department and aggregate salary, but you also want to keep each employee's name alongside the department total. You cannot put "name" in groupBy (it would create too many groups) and you cannot put it in agg() (it is not an aggregation). The solution is a window function — compute the aggregation over the window and add it as a new column without collapsing rows.
# Wrong approach: groupBy collapses rows, you lose employee name
df.groupBy("dept").agg(F.sum("salary").alias("dept_total"))
# Result: just dept + dept_total — employee names gone
# Correct: window function adds aggregation without collapsing
w = Window.partitionBy("dept")
df.withColumn("dept_total", F.sum("salary").over(w)) .withColumn("pct_of_dept", (F.col("salary") / F.col("dept_total") * 100).round(2))
# Result: all employee rows kept, with dept_total and pct_of_dept added
# Also useful: add global total without losing rows
df.withColumn("global_total", F.sum("revenue").over(Window.partitionBy()))Q34. What is the difference between orderBy() and sortWithinPartitions()?
orderBy() produces a globally sorted DataFrame — all rows across all partitions are in sorted order. This requires a full shuffle to collect and sort all data, making it one of the most expensive operations in Spark. The result is a single-partition output by default, which can cause the driver to OOM for large datasets.
sortWithinPartitions() sorts rows within each partition independently, with no shuffle. The overall DataFrame is not globally sorted, but within each partition the data is ordered. Use this when you need ordered writes (e.g., each output file is internally sorted for downstream range scans) without paying the global sort cost.
# Global sort — expensive, requires full shuffle
df.orderBy("event_time").write.parquet("output/")
# Sort within partitions — cheap, no shuffle
df.sortWithinPartitions("event_time").write.parquet("output/")
# If you must globally sort a large dataset, repartition first
# to control how many output files are produced
df.repartition(100).sortWithinPartitions("event_time").write.parquet("output/")Q35. How do you union multiple DataFrames in PySpark?
PySpark has two union methods: union() combines DataFrames by column position — the column order must match exactly. unionByName() matches columns by name regardless of order, and with allowMissingColumns=True it fills missing columns with null. Always use unionByName() when combining data from different sources or time periods where schema may have drifted.
# union() — columns matched by position, order must match
combined = jan_df.union(feb_df)
# unionByName() — columns matched by name, order doesn't matter
combined = jan_df.unionByName(feb_df)
# unionByName with missing columns — new columns get null in older DataFrames
combined = old_df.unionByName(new_df_with_extra_cols, allowMissingColumns=True)
# Union many DataFrames — use functools.reduce
from functools import reduce
dfs = [jan_df, feb_df, mar_df, apr_df]
combined = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), dfs)Q36. How do you handle nested JSON / struct columns in PySpark?
Real-world data from APIs and Kafka often arrives as nested JSON with struct columns (objects) and array columns (lists). PySpark has rich support for working with these. Access struct fields with dot notation or getField(). Explode arrays into rows with explode(). Parse JSON strings with from_json(). Flatten nested structs with col("parent.*") to expand all sub-fields.
from pyspark.sql.functions import explode, from_json, col, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
# Access nested struct field
df.select("user.id", "user.name", "user.address.city")
# Or with getField
df.select(F.col("user").getField("id").alias("user_id"))
# Explode array column — one row per array element
df.withColumn("tag", explode(F.col("tags")))
# Parse JSON string column
schema = StructType([
StructField("event_type", StringType()),
StructField("page_url", StringType()),
])
df.withColumn("payload", from_json(F.col("json_string"), schema)) .select("user_id", "payload.event_type", "payload.page_url")
# Flatten all fields from a struct into top-level columns
df.select("user_id", "metadata.*")Q37. What is Spark Structured Streaming?
Spark Structured Streaming is PySpark's built-in stream processing engine. It treats a live data stream as an unbounded DataFrame — you write the same transformations as batch, and Spark continuously runs them as new data arrives from sources like Kafka, cloud storage, or sockets. It supports three output modes: append (new rows only), complete (full result table each trigger), and update (changed rows only).
The key concept is the trigger — how often Spark processes new data. Trigger.ProcessingTime("1 minute") runs a micro-batch every minute. Trigger.Once() processes all available data and stops (useful for scheduled jobs). Trigger.Continuous("1 second") is the experimental true streaming mode with sub-second latency. Checkpointing is mandatory in production to recover from failures.
# Read from Kafka
stream_df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker:9092") .option("subscribe", "events") .load()
# Parse the Kafka value (bytes → string → JSON)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([StructField("user_id", StringType()), StructField("revenue", DoubleType())])
events = stream_df .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) .select("data.*")
# Aggregate with 5-minute tumbling windows
from pyspark.sql.functions import window
result = events.groupBy(window("event_time", "5 minutes"), "user_id") .agg(F.sum("revenue").alias("total_revenue"))
# Write to Delta with checkpointing
result.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "s3://bucket/checkpoints/revenue/") .trigger(processingTime="1 minute") .start("/delta/revenue_per_user")Q38. What is broadcast() and what are its limits?
Broadcasting sends a full copy of a DataFrame to every executor node so joins can be done locally without a shuffle. The limit: the table must fit in executor memory. If your executor has 8GB of heap and your broadcast table is 6GB, you will get OOM errors. The default threshold is 10MB (spark.sql.autoBroadcastJoinThreshold) — anything larger requires a manual broadcast hint and enough executor memory.
Broadcasting is also one-time — if the broadcast data changes between job runs, you must ensure the new version is used. In iterative ML algorithms that re-broadcast parameters each iteration, the broadcast variable can accumulate and cause memory pressure if not explicitly destroyed.
from pyspark.sql.functions import broadcast
# Automatic broadcast — Spark decides based on table statistics
# Increase threshold if you have large lookup tables and enough memory
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024)) # 50MB
# Manual broadcast hint — force it regardless of threshold
result = large_fact.join(broadcast(dim_table), on="product_id")
# Check if broadcast is happening — look for BroadcastHashJoin in plan
result.explain()
# Broadcast variable for RDD operations
lookup_dict = {"US": "United States", "UK": "United Kingdom"}
bc_lookup = spark.sparkContext.broadcast(lookup_dict)
rdd.map(lambda row: bc_lookup.value.get(row["country"], "Unknown"))
bc_lookup.unpersist() # free memory when doneQ39. How do you write unit tests for PySpark code?
Testing PySpark is important but often skipped by candidates — mentioning it in an interview signals engineering maturity. The pattern: create a local SparkSession in your test file, create small test DataFrames from inline data, run your transformation function, and assert the output. Use pytest with a shared SparkSession fixture to avoid creating a new Spark session per test (which is slow).
import pytest
from pyspark.sql import SparkSession
from pyspark.sql import Row
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder .master("local[2]") .appName("pytest") .getOrCreate()
def compute_revenue(df):
return df.withColumn("total", F.col("price") * F.col("qty")) .groupBy("user_id") .agg(F.sum("total").alias("total_revenue"))
def test_compute_revenue(spark):
input_data = spark.createDataFrame([
Row(user_id="u1", price=10.0, qty=2),
Row(user_id="u1", price=5.0, qty=3),
Row(user_id="u2", price=20.0, qty=1),
])
result = compute_revenue(input_data).orderBy("user_id")
rows = result.collect()
assert rows[0]["user_id"] == "u1"
assert rows[0]["total_revenue"] == 35.0 # 10*2 + 5*3
assert rows[1]["total_revenue"] == 20.0Q40. How do you tune Spark memory configuration?
Spark executor memory is divided into: Reserved memory (300MB fixed for system), User memory (for Python/JVM objects in your code), Spark memory (shared between execution memory for shuffles/joins and storage memory for cache). The key config is spark.executor.memory (total JVM heap) and spark.memory.fraction (default 0.6 — 60% of heap goes to Spark memory). The other 40% is user memory.
Signs of memory problems: executor OOM errors (increase spark.executor.memory), spill to disk (increase memory fraction or add more executors), GC overhead (large heap with many small objects — reduce heap or tune GC). On modern clusters, off-heap memory (spark.memory.offHeap) can help reduce GC pressure for large caches.
# Key memory configurations
spark = SparkSession.builder .config("spark.executor.memory", "8g") .config("spark.executor.cores", "4") .config("spark.driver.memory", "4g") .config("spark.memory.fraction", "0.6") # 60% of heap = Spark memory
.config("spark.memory.storageFraction", "0.5") # 50% of Spark memory = cache
.config("spark.executor.memoryOverhead", "1g") # off-heap for Python worker, Netty
.getOrCreate()
# Monitor memory — check Spark UI → Executors tab
# Look at: Storage Memory Used, Shuffle Read/Write, GC Time
# If GC Time > 10% of task time → memory pressure, increase executor memoryQ41. What is Delta Lake and what problem does it solve?
Traditional data lakes (raw Parquet files on S3) have serious reliability problems: two concurrent writers can corrupt data by overwriting each other's files, a failed write leaves partial data visible to readers, you cannot atomically update or delete specific rows, and schema changes can silently break downstream queries. Delta Lake solves all of these with ACID transactions.
Delta stores data as Parquet files plus a transaction log (_delta_log/) — a directory of JSON files, one per transaction. Every read and write goes through the transaction log. Readers always see a consistent snapshot. Writers obtain optimistic locks. Failed writes are rolled back automatically. The transaction log is also what powers time travel — you can replay the log to reconstruct any historical version of the table.
Q42. What is time travel in Delta Lake?
Time travel lets you query any historical version of a Delta table. Delta never immediately deletes committed data — it only marks it as superseded in the transaction log. Old data files remain on disk until you explicitly run VACUUM. This means you can query the state of a table at any past point in time, either by version number or timestamp.
Real use cases: debugging ("the dashboard showed wrong numbers yesterday — what did the table look like at 3pm?"), compliance auditing, comparing model training datasets across runs, and implementing a "soft undo" when a bad data load needs to be rolled back.
# Read a specific version
df_v5 = spark.read.format("delta") .option("versionAsOf", 5) .load("/delta/orders")
# Read at a specific timestamp
df_monday = spark.read.format("delta") .option("timestampAsOf", "2026-04-14 09:00:00") .load("/delta/orders")
# View full table history
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/delta/orders")
dt.history().select("version", "timestamp", "operation", "operationParameters").show()
# Restore table to a previous version
dt.restoreToVersion(5)
# Clean up old versions (default 7-day retention)
dt.vacuum(retentionHours=168)Q43. How does MERGE INTO work in Delta Lake?
MERGE INTO is Delta Lake's upsert operation — it atomically handles both updates to existing records and inserts of new records in a single pass. It is the standard pattern for incremental loads: every pipeline run brings a batch of "source" records, and MERGE handles matching them against the "target" table based on a key.
MERGE is also used for SCD Type 2 (slowly changing dimensions): when a record changes, the old version gets an end_date and is_current=false, and a new version is inserted with is_current=true. This requires a more complex merge with whenMatchedUpdate and whenNotMatchedInsert conditions.
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/delta/customers")
# Standard upsert — update if exists, insert if new
target.alias("t").merge(
source=updates_df.alias("s"),
condition="t.customer_id = s.customer_id"
) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute()
# Upsert with selective update — only update specific columns
target.alias("t").merge(
source=updates_df.alias("s"),
condition="t.customer_id = s.customer_id"
) .whenMatchedUpdate(set={
"email": "s.email",
"updated_at": "s.updated_at",
}) .whenNotMatchedInsertAll() .execute()
# Delete records that no longer exist in source (full sync)
target.alias("t").merge(
source=source_df.alias("s"),
condition="t.id = s.id"
) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .whenNotMatchedBySourceDelete() .execute()Q44. What is Z-ordering in Delta Lake?
Z-ordering is a multi-dimensional data clustering technique that co-locates related data in the same files, improving data skipping efficiency when you filter on multiple columns. Standard Parquet partitioning only skips at the partition (directory) level and only works on the partition column. Z-ordering works at the file level and can improve skipping for any combination of Z-ordered columns.
Example: your events table is partitioned by date (good for date filters), but queries often also filter by country AND product_category. Z-ordering on these two columns reorganises data within each day's partition so that rows with the same country+category cluster in the same files — allowing Delta to skip large chunks of files when filtering on those columns.
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/delta/events")
# Z-order on frequently filtered columns
dt.optimize().executeZOrderBy("country", "product_category")
# Verify: query should now read fewer files
spark.sql("""
SELECT COUNT(*) FROM delta.`/delta/events`
WHERE country = 'US' AND product_category = 'Electronics'
""").explain()
# Look for "numFilesSkipped" in the query metricsQ45. What is schema enforcement and schema evolution in Delta Lake?
Schema enforcement means Delta Lake rejects any write that does not conform to the table's registered schema. If your source data suddenly has a new column or a changed type, Delta will throw a schema mismatch error before any data is written — protecting downstream consumers from silent breakage. This is one of the most valuable Delta features for production data quality.
Schema evolution allows Delta to automatically expand the schema when new columns appear in incoming data. Enable it with .option("mergeSchema", "true"). The new column is added to the table schema and historical rows get null for that column. This is a one-way operation — Delta won't automatically remove columns that disappear from the source.
# Schema enforcement — write fails if schema doesn't match
try:
df_with_new_col.write.format("delta").mode("append").save("/delta/orders")
except Exception as e:
print(e) # AnalysisException: A schema mismatch detected...
# Schema evolution — allow new columns to be added automatically
df_with_new_col.write.format("delta") .mode("append") .option("mergeSchema", "true") .save("/delta/orders")
# Or enable globally
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")Q46. What is OPTIMIZE and VACUUM in Delta Lake?
OPTIMIZE compacts many small Parquet files into fewer large files (target size 1GB by default). Small files accumulate over time from streaming writes, frequent batch loads, or fine-grained partitioning. Reading 10,000 x 1MB files is much slower than reading 10 x 1GB files — OPTIMIZE fixes this. Run it periodically (daily or weekly depending on write frequency).
VACUUM removes Parquet files that are no longer referenced by the Delta transaction log and are older than the retention period (default 7 days). Without VACUUM, your storage grows indefinitely even though old versions are not needed. Important: do not set retention below 7 days if you need time travel, and do not run VACUUM on a table currently being read by a long-running query.
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/delta/events")
# Compact small files
dt.optimize().executeCompaction()
# Compact + Z-order in one operation
dt.optimize().executeZOrderBy("country", "event_type")
# Remove files older than 7 days (default)
dt.vacuum()
# Remove files older than 24 hours (minimum — do NOT go lower)
dt.vacuum(retentionHours=24)
# Dry run — see what VACUUM would delete without deleting
dt.vacuum(retentionHours=168, dryRun=True).show()Q47. What is the medallion architecture and how do you implement it?
The medallion architecture organises a data lakehouse into three layers: Bronze (raw), Silver (refined), and Gold (aggregated). Bronze stores data exactly as ingested — immutable, no transformations, partitioned by ingestion date. Silver applies cleaning, deduplication, type casting, and business rules. Gold contains purpose-built aggregations for specific consumers like BI dashboards or ML feature stores.
The architectural insight that interviewers want to hear: Bronze enables full replay. If a bug is discovered in Silver logic six months later, you can re-run the Silver transformation from Bronze without re-ingesting from source systems. This is only possible because Bronze is immutable and complete. Without Bronze, you would need to re-pull data from the source — which might no longer be available.
# Bronze — ingest raw, add metadata, never modify source data
def to_bronze(source_df, table_path, source_name):
return source_df .withColumn("_ingested_at", F.current_timestamp()) .withColumn("_source", F.lit(source_name)) .write.format("delta").mode("append") .partitionBy(F.to_date("_ingested_at").alias("_date")) .save(table_path)
# Silver — clean, deduplicate, enforce types
def bronze_to_silver(bronze_path, silver_path):
df = spark.read.format("delta").load(bronze_path) .dropna(subset=["order_id", "user_id"]) .withColumn("order_date", F.to_date("order_date_str", "yyyy-MM-dd")) .withColumn("revenue", F.col("revenue_str").cast("double")) .dropDuplicates(["order_id"])
DeltaTable.forPath(spark, silver_path).alias("t") .merge(df.alias("s"), "t.order_id = s.order_id") .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# Gold — aggregate for a specific use case
def silver_to_gold_daily(silver_path, gold_path):
spark.read.format("delta").load(silver_path) .groupBy("order_date", "region") .agg(F.sum("revenue").alias("total_revenue"),
F.countDistinct("user_id").alias("dau")) .write.format("delta").mode("overwrite") .partitionBy("order_date").save(gold_path)Q48. What is Change Data Feed (CDF) in Delta Lake?
Change Data Feed (also called Change Data Capture on Delta) tracks every row-level change — inserts, updates, and deletes — made to a Delta table. When enabled, Delta writes additional _change_data files alongside the regular data files. Downstream pipelines can read only the changes since the last run rather than reprocessing the full table, making incremental pipelines much more efficient.
# Enable CDF on a new table
spark.sql("""
CREATE TABLE orders (id BIGINT, status STRING, revenue DOUBLE)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Or enable on an existing table
spark.sql("ALTER TABLE orders SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
# Read changes since version 5
changes = spark.read.format("delta") .option("readChangeFeed", "true") .option("startingVersion", 5) .table("orders")
# _change_type column: insert, update_preimage, update_postimage, delete
changes.filter(F.col("_change_type").isin(["insert", "update_postimage"])) .show()Q49. How do you handle late-arriving data in a streaming pipeline?
In real-world streaming, events arrive out of order — a mobile event generated at 3:00pm might arrive at the server at 3:15pm due to network delays. If your streaming aggregation is based on event time (when the event occurred) rather than processing time (when Spark received it), late arrivals will be missed after their window closes.
Spark Structured Streaming handles this with watermarking. A watermark defines how late data can arrive and still be included. df.withWatermark("event_time", "15 minutes") tells Spark: include events up to 15 minutes late, but discard anything older. State for closed windows is retained for the watermark duration then cleaned up automatically.
# Accept events up to 15 minutes late
events_with_watermark = stream_df .withWatermark("event_time", "15 minutes")
# Tumbling window aggregation that handles late data
result = events_with_watermark .groupBy(
window(F.col("event_time"), "5 minutes"), # 5-min tumbling window
F.col("product_id")
) .agg(F.sum("revenue").alias("window_revenue"))
result.writeStream .format("delta") .outputMode("append") # append: only emit when window is finalized
.option("checkpointLocation", "/checkpoints/revenue/") .start("/delta/revenue_windows")Q50. How does Spark Structured Streaming achieve exactly-once semantics?
Exactly-once means every event is processed and reflected in the output exactly one time — not zero times (at-most-once) and not multiple times (at-least-once). Spark Structured Streaming achieves exactly-once end-to-end through three mechanisms working together: idempotent sinks (the output destination must support idempotent writes), transactional commits (Delta Lake's ACID writes), and offset tracking in checkpoints (Spark records exactly which Kafka offsets have been processed, so on restart it resumes from the right position without reprocessing).
In practice: use Delta Lake as the sink (ACID writes make them idempotent), configure a checkpoint location on reliable storage (S3, GCS — not local disk), and never change the checkpoint location when redeploying. If you must reprocess from an earlier offset (e.g., to fix a bug), clear the checkpoint and specify the starting offset explicitly.
# Exactly-once streaming to Delta Lake
stream_df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "s3://bucket/checkpoints/my-pipeline/") .trigger(processingTime="1 minute") .start("/delta/output")
# If you need to reprocess from a specific Kafka offset:
stream_df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker:9092") .option("subscribe", "events") .option("startingOffsets", '{"events":{"0":12345}}') .load()
# AND clear the checkpoint directory before starting