I try to migrate a job that is running on Dataproc 2.1 images (Spark 3.3, Python 3.10) to Dataproc 2.2 images (Spark 3.5, Python 3.11). However I encounter an error on one of my queries. After further investigation, I am able to reproduce the issue with this minimal example:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.Builder().getOrCreate()
df = (spark.readStream.format("rate")
.option("rowsPerSecond", 4)
.load()
).withWatermark(
"timestamp", "1 seconds"
).withColumn(
"window",
F.window("timestamp", "10 seconds"),
).groupBy(F.col("window")).agg(F.count(F.expr("*")))
df.writeStream.format("console").queryName("sql_console").start()
spark.streams.awaitAnyTermination(60)
_ = [q.stop() for q in spark.streams.active]
This raises the following exception:
25/03/17 11:37:04 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 7) (cluster-test-alexis-<redacted>.internal executor 2): java.io.IOException: mkdir of file:/tmp/temporary-42a491af-ba8c-412b-b06e-fb420879b92f/state/0/7 failed
at .apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1414)
at .apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)
at .apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:219)
at .apache.hadoop.fs.FileContext$4.next(FileContext.java:818)
[...]
Removing the line groupby(...).agg(...)
cause the exception to disappear.
From what I understand, spark tries to create a temporary file to do the aggregation, but fails to do it (don't know why: permission issue, insufficient space, etc.?).
What can I do to investigate further or workaround ?
Kind.
Edit
While scrolling the logs, I also stumbled upon this line, at the start of the job:
25/03/17 11:36:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-42a491af-ba8c-412b-b06e-fb420879b92f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
This states that the temporary checkpoint location has been created, so I suppose that my error does not come from a permission issue (or the initial creation would fail also).
Edit 2
A very easy workaround is to explicitly define the checkpoint location:
spark = SparkSession.Builder().config("spark.sql.streaming.checkpointLocation", "/tmp").getOrCreate()
By the way, I don't see any temporary-...
folder in the HDFS /tmp
folder, but only folders with the ID of the query.