I've inherited some Spark code and am having trouble understanding "why does this even work". Short-short is that I load a DF with info from a delta table, and then join that to a streaming DF. I would expect that that first DF would never refresh itself, but it seems to be doing so. This is all on Azure Databricks using spark scala.
Specifically, I have a table with HW information which is periodically updated via scheduled job, like so:
Notebook 1 page 1
// grab information and put into df
df.write
.format("delta")
.mode("overwrite")
.save(basePath + s"Metadata/HardwareInfo")
Then I have a permanently-running notebook which reads this in and uses it in a join against a streaming read DF, like so...
Notebook 2 page 1
val dfHardware = spark.read
.format("delta")
.load(basePath+"Metadata/HardwareInfo")
Notebook 2 page 2
val dfStreamingInput =
spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.load(basePath + "DeltaTable/streaming-input-location")
Notebook 2 page 3
val dfJoined =
dfStreamingInput.join(dfHardware)
.where(... criteria ...)
.select(... criteria ...)
Notebook 2 page 4
// do things with dfJoined
// edit: I originally didn't mention the write
dfJoined
.writeStreaming
.format("delta")
.option("checkpointLocation", "...")
.option("path","...")
.outputMode("append")
.trigger(Trigger.ProcessingTime("30 seconds")).start()
Naively, I would expect dfHardware to be a snapshot and not refresh itself. But I've experimentally verified that dfHardware contains information put into the underlying table after Notebook 2 was started.
Am I just wrong in my "experimentally verified"? Or is that first read from ".../HardwareInfo" refreshing itself? (And how can I suppress that behavior if I stop wanting it?)
I've inherited some Spark code and am having trouble understanding "why does this even work". Short-short is that I load a DF with info from a delta table, and then join that to a streaming DF. I would expect that that first DF would never refresh itself, but it seems to be doing so. This is all on Azure Databricks using spark scala.
Specifically, I have a table with HW information which is periodically updated via scheduled job, like so:
Notebook 1 page 1
// grab information and put into df
df.write
.format("delta")
.mode("overwrite")
.save(basePath + s"Metadata/HardwareInfo")
Then I have a permanently-running notebook which reads this in and uses it in a join against a streaming read DF, like so...
Notebook 2 page 1
val dfHardware = spark.read
.format("delta")
.load(basePath+"Metadata/HardwareInfo")
Notebook 2 page 2
val dfStreamingInput =
spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.load(basePath + "DeltaTable/streaming-input-location")
Notebook 2 page 3
val dfJoined =
dfStreamingInput.join(dfHardware)
.where(... criteria ...)
.select(... criteria ...)
Notebook 2 page 4
// do things with dfJoined
// edit: I originally didn't mention the write
dfJoined
.writeStreaming
.format("delta")
.option("checkpointLocation", "...")
.option("path","...")
.outputMode("append")
.trigger(Trigger.ProcessingTime("30 seconds")).start()
Naively, I would expect dfHardware to be a snapshot and not refresh itself. But I've experimentally verified that dfHardware contains information put into the underlying table after Notebook 2 was started.
Am I just wrong in my "experimentally verified"? Or is that first read from ".../HardwareInfo" refreshing itself? (And how can I suppress that behavior if I stop wanting it?)
Share Improve this question edited Feb 7 at 6:48 Gaël J 15.1k5 gold badges22 silver badges44 bronze badges asked Feb 6 at 23:57 Troy TerryTroy Terry 32 bronze badges1 Answer
Reset to default 2Update based on new code in OP:
As Gaël J mentioned, streaming is an "infinite write". To be specific streaming with trigger = Continuous or ProcessingTime is an "infinite write". With trigger = Once or AvailableNow, it's more like batch processing (except that the "batch is delta" since last time the trigger fired).
With .trigger(Trigger.ProcessingTime("30 seconds")).start()
you're explicitly telling Spark to "refresh" every 30 seconds.
It's because of Lazy nature of Spark execution/evaluation. Also see:
- Spark Transformation - Why is it lazy and what is the advantage?
- From quick guide:
PySpark DataFrames are lazily evaluated. They are implemented on top of RDDs. When Spark transforms data, it does not immediately compute the transformation but plans how to compute later. When actions such as collect() are explicitly called, the computation starts.
Specifically in the code you posted there are no actions, so Spark won't actually read anything at all, it'll just build a plan. There must be some action (write
/save
/..) in some code you've not posted.
The only reason why it would reuse a dataframe would be when you "encourage" it e.g. by broadcasting/caching the Dataframe and/or ensuring your code is written such that same Dataframe (after all filter push downs and aggregations and ...) is usable in two different paths.