最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Why does Spark raises an IOException while running a aggregation on a streaming dataframe in Dataproc 2.2 - Stack Overf

programmeradmin3浏览0评论

I try to migrate a job that is running on Dataproc 2.1 images (Spark 3.3, Python 3.10) to Dataproc 2.2 images (Spark 3.5, Python 3.11). However I encounter an error on one of my queries. After further investigation, I am able to reproduce the issue with this minimal example:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F


spark = SparkSession.Builder().getOrCreate()

df =  (spark.readStream.format("rate")
        .option("rowsPerSecond", 4)
        .load()
    ).withWatermark(
        "timestamp", "1 seconds"
    ).withColumn(
        "window",
        F.window("timestamp", "10 seconds"),
    ).groupBy(F.col("window")).agg(F.count(F.expr("*")))


df.writeStream.format("console").queryName("sql_console").start()
spark.streams.awaitAnyTermination(60)
_ = [q.stop() for q in  spark.streams.active]

This raises the following exception:

25/03/17 11:37:04 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 7) (cluster-test-alexis-<redacted>.internal executor 2): java.io.IOException: mkdir of file:/tmp/temporary-42a491af-ba8c-412b-b06e-fb420879b92f/state/0/7 failed
    at .apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1414)
    at .apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)
    at .apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:219)
    at .apache.hadoop.fs.FileContext$4.next(FileContext.java:818)
    [...]

Removing the line groupby(...).agg(...) cause the exception to disappear. From what I understand, spark tries to create a temporary file to do the aggregation, but fails to do it (don't know why: permission issue, insufficient space, etc.?).

What can I do to investigate further or workaround ?

Kind.


Edit

While scrolling the logs, I also stumbled upon this line, at the start of the job:

25/03/17 11:36:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-42a491af-ba8c-412b-b06e-fb420879b92f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.

This states that the temporary checkpoint location has been created, so I suppose that my error does not come from a permission issue (or the initial creation would fail also).


Edit 2

A very easy workaround is to explicitly define the checkpoint location:

spark = SparkSession.Builder().config("spark.sql.streaming.checkpointLocation", "/tmp").getOrCreate()

By the way, I don't see any temporary-... folder in the HDFS /tmp folder, but only folders with the ID of the query.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论