I have a Databricks pyspark query that has been running fine for the last two weeks but am now getting the following error despite no changes to the query: OutOfMemoryError: GC overhead limit exceeded.
I have done some research on possible solutions and have tried adding in partitions to see if that would help but it did not.
Are there any other ways I can troubleshoot this error?
Here is the query I am running:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit, when, sum, row_number
# Add a unique row ID column using row_number
window_spec = Window.orderBy(lit(1))
df_bs_exploded1 = df_bs_exploded.withColumn("unique_row_id", row_number().over(window_spec))
# Step 1: Aggregate df_final to get the total allocation quantity per key
df_allocations = df_final.groupBy(
"sap_sales_order_number", "product_end_customer_id", "marketing_sub_code", "fiscal_quarter"
).agg(
sum("allocation_quantity").alias("total_allocation_quantity")
)
# Step 2: Join df_bs_exploded with df_allocations
df_joined = df_bs_exploded1.join(
df_allocations,
on=["sap_sales_order_number", "product_end_customer_id", "marketing_sub_code", "fiscal_quarter"],
how="left"
)
# Step 3: Add a row number for each partition to track allocation order
window_spec_bs1 = Window.partitionBy(
"sap_sales_order_number", "product_end_customer_id", "marketing_sub_code","fiscal_quarter"
).orderBy(lit(1))
df_with_row_number = df_joined.withColumn("row_number", row_number().over(window_spec_bs1))
# Step 4: Allocate rows based on total_allocation_quantity
df_allocated = df_with_row_number.withColumn(
"allocated",
when(
(col("row_number") <= col("total_allocation_quantity")) & col("total_allocation_quantity").isNotNull(),
"Yes"
).otherwise("No")
)
# Step 5: Add an allocated_quantity column to track allocated amounts
df_final_bs_result = df_allocated.withColumn(
"allocated_quantity",
when(col("allocated") == "Yes", 1).otherwise(lit(None))
)
# Drop the temporary row_number column if no longer needed
df_final_bs_result = df_final_bs_result.drop("row_number")
# Display the final result
display(df_final_bs_result)
And here is the full error message I am getting:
.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 761.0 failed 4 times, most recent failure: Lost task 1.3 in stage 761.0 (TID 8251) (100.65.19.63 executor 11): java.lang.OutOfMemoryError: GC overhead limit exceeded
at .apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:513)
at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.generate_doConsume_0$(Unknown Source)
at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeysOutput_0$(Unknown Source)
at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
at .apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at .apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at .apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:179)
at .apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
at .apache.spark.scheduler.ShuffleMapTask$$Lambda$2492/1771619670.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
at .apache.spark.scheduler.ShuffleMapTask$$Lambda$1927/946116243.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at .apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:225)
at .apache.spark.scheduler.Task.doRunTask(Task.scala:199)
at .apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
at .apache.spark.scheduler.Task$$Lambda$1907/230573460.apply(Unknown Source)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at com.databricks.unity.HandleImpl$$Lambda$1908/336068323.apply(Unknown Source)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at .apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
at .apache.spark.scheduler.Task$$Lambda$1871/401348611.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at .apache.spark.scheduler.Task.run(Task.scala:102)
at .apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)
Driver stacktrace:
at .apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4018)
at scala.Option.getOrElse(Option.scala:189)
at .apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:4016)
at .apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3930)
at .apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3917)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at .apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3917)
at .apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1766)
at .apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1749)
at scala.Option.foreach(Option.scala:407)
at .apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1749)
at .apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4277)
at .apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4179)
at .apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4165)
at .apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at .apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:513)
at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.generate_doConsume_0$(Unknown Source)
at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeysOutput_0$(Unknown Source)
at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
at .apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at .apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at .apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:179)
at .apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
at .apache.spark.scheduler.ShuffleMapTask$$Lambda$2492/1771619670.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
at .apache.spark.scheduler.ShuffleMapTask$$Lambda$1927/946116243.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at .apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:225)
at .apache.spark.scheduler.Task.doRunTask(Task.scala:199)
at .apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
at .apache.spark.scheduler.Task$$Lambda$1907/230573460.apply(Unknown Source)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at com.databricks.unity.HandleImpl$$Lambda$1908/336068323.apply(Unknown Source)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at .apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
at .apache.spark.scheduler.Task$$Lambda$1871/401348611.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at .apache.spark.scheduler.Task.run(Task.scala:102)
at .apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)