最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Databricks PySpark error: OutOfMemoryError: GC overhead limit exceeded - Stack Overflow

programmeradmin0浏览0评论

I have a Databricks pyspark query that has been running fine for the last two weeks but am now getting the following error despite no changes to the query: OutOfMemoryError: GC overhead limit exceeded.

I have done some research on possible solutions and have tried adding in partitions to see if that would help but it did not.

Are there any other ways I can troubleshoot this error?

Here is the query I am running:

from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit, when, sum, row_number

# Add a unique row ID column using row_number
window_spec = Window.orderBy(lit(1))
df_bs_exploded1 = df_bs_exploded.withColumn("unique_row_id", row_number().over(window_spec))

# Step 1: Aggregate df_final to get the total allocation quantity per key
df_allocations = df_final.groupBy(
    "sap_sales_order_number", "product_end_customer_id", "marketing_sub_code", "fiscal_quarter"
).agg(
    sum("allocation_quantity").alias("total_allocation_quantity")
)

# Step 2: Join df_bs_exploded with df_allocations
df_joined = df_bs_exploded1.join(
    df_allocations,
    on=["sap_sales_order_number", "product_end_customer_id", "marketing_sub_code", "fiscal_quarter"],
    how="left"
)

# Step 3: Add a row number for each partition to track allocation order
window_spec_bs1 = Window.partitionBy(
    "sap_sales_order_number", "product_end_customer_id", "marketing_sub_code","fiscal_quarter"
).orderBy(lit(1))

df_with_row_number = df_joined.withColumn("row_number", row_number().over(window_spec_bs1))

# Step 4: Allocate rows based on total_allocation_quantity
df_allocated = df_with_row_number.withColumn(
    "allocated",
    when(
        (col("row_number") <= col("total_allocation_quantity")) & col("total_allocation_quantity").isNotNull(),
        "Yes"
    ).otherwise("No")
)

# Step 5: Add an allocated_quantity column to track allocated amounts
df_final_bs_result = df_allocated.withColumn(
    "allocated_quantity",
    when(col("allocated") == "Yes", 1).otherwise(lit(None))
)

# Drop the temporary row_number column if no longer needed
df_final_bs_result = df_final_bs_result.drop("row_number")

# Display the final result
display(df_final_bs_result)

And here is the full error message I am getting:

.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 761.0 failed 4 times, most recent failure: Lost task 1.3 in stage 761.0 (TID 8251) (100.65.19.63 executor 11): java.lang.OutOfMemoryError: GC overhead limit exceeded
    at .apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:513)
    at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.generate_doConsume_0$(Unknown Source)
    at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeysOutput_0$(Unknown Source)
    at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
    at .apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at .apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at .apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:179)
    at .apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
    at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
    at .apache.spark.scheduler.ShuffleMapTask$$Lambda$2492/1771619670.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
    at .apache.spark.scheduler.ShuffleMapTask$$Lambda$1927/946116243.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
    at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
    at .apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:225)
    at .apache.spark.scheduler.Task.doRunTask(Task.scala:199)
    at .apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
    at .apache.spark.scheduler.Task$$Lambda$1907/230573460.apply(Unknown Source)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
    at com.databricks.unity.HandleImpl$$Lambda$1908/336068323.apply(Unknown Source)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
    at .apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
    at .apache.spark.scheduler.Task$$Lambda$1871/401348611.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at .apache.spark.scheduler.Task.run(Task.scala:102)
    at .apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)

Driver stacktrace:
    at .apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4018)
    at scala.Option.getOrElse(Option.scala:189)
    at .apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:4016)
    at .apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3930)
    at .apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3917)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at .apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3917)
    at .apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1766)
    at .apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1749)
    at scala.Option.foreach(Option.scala:407)
    at .apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1749)
    at .apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4277)
    at .apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4179)
    at .apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4165)
    at .apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at .apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:513)
    at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.generate_doConsume_0$(Unknown Source)
    at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeysOutput_0$(Unknown Source)
    at .apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
    at .apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at .apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at .apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:179)
    at .apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
    at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
    at .apache.spark.scheduler.ShuffleMapTask$$Lambda$2492/1771619670.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at .apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
    at .apache.spark.scheduler.ShuffleMapTask$$Lambda$1927/946116243.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
    at .apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
    at .apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:225)
    at .apache.spark.scheduler.Task.doRunTask(Task.scala:199)
    at .apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
    at .apache.spark.scheduler.Task$$Lambda$1907/230573460.apply(Unknown Source)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
    at com.databricks.unity.HandleImpl$$Lambda$1908/336068323.apply(Unknown Source)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
    at .apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
    at .apache.spark.scheduler.Task$$Lambda$1871/401348611.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at .apache.spark.scheduler.Task.run(Task.scala:102)
    at .apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)
发布评论

评论列表(0)

  1. 暂无评论