I have already gone through the basics, working example and the checkpoint directory contents.
Scenario
Say I am reading from a streaming source e.g. Kafka (but not specific to it alone) - I then write it to a CSV. Checkpoints enabled at both read and write streams.
Question
Say I have read 1000 records from the source, while writing to the csv, I encounter a failure after writing 250 records. I want to understand how checkpoints would behave ? Assuming that the dataframe is lost from memory - ideally the read checkpoint would be 1000 and the write checkpoint would be 250, so on restart we would not have anything to read and the records 250 to 1000 would be lost ?
How do you handle such a scenario and ensure the code is restartable ?
Is it needed to have the checkpoint at both read and write ?
If I have to set the offset to a number as 250 - how do we do it dynamically ?
Sample code:
# Reading from Kafka
read_stream = spark.readStream \
.format("kafka") \ # Please assume any source really - not specific to kafka
.option("kafka.bootstrap.servers", "host:port") \
.option("subscribe", "topic_name") \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "/path/to/read/checkpoint") \
.load()
# Writing to CSV
write_stream = read_stream \
.writeStream \
.format("csv") \
.option("path", "/output/path") \
.option("checkpointLocation", "/path/to/write/checkpoint") \
.start()
I have already gone through the basics, working example and the checkpoint directory contents.
Scenario
Say I am reading from a streaming source e.g. Kafka (but not specific to it alone) - I then write it to a CSV. Checkpoints enabled at both read and write streams.
Question
Say I have read 1000 records from the source, while writing to the csv, I encounter a failure after writing 250 records. I want to understand how checkpoints would behave ? Assuming that the dataframe is lost from memory - ideally the read checkpoint would be 1000 and the write checkpoint would be 250, so on restart we would not have anything to read and the records 250 to 1000 would be lost ?
How do you handle such a scenario and ensure the code is restartable ?
Is it needed to have the checkpoint at both read and write ?
If I have to set the offset to a number as 250 - how do we do it dynamically ?
Sample code:
# Reading from Kafka
read_stream = spark.readStream \
.format("kafka") \ # Please assume any source really - not specific to kafka
.option("kafka.bootstrap.servers", "host:port") \
.option("subscribe", "topic_name") \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "/path/to/read/checkpoint") \
.load()
# Writing to CSV
write_stream = read_stream \
.writeStream \
.format("csv") \
.option("path", "/output/path") \
.option("checkpointLocation", "/path/to/write/checkpoint") \
.start()
Share
Improve this question
edited Mar 23 at 21:37
halfer
20.4k19 gold badges109 silver badges202 bronze badges
asked Mar 21 at 12:40
rainingdistrosrainingdistros
6691 gold badge7 silver badges14 bronze badges
1
- 1 Structured Streaming I assume – Ged Commented Mar 21 at 16:29
2 Answers
Reset to default 1Not sure about your use case but in general, only checkpoint
in write stream is needed to set as it has already saved the full state of your query.
When you set your checkpoint in your write stream, when your query / application is stopped due to whatever reasons and when you restart your query / application, you read stream will start at the offset that based on the metadata that you have in the sink table and you DON'T need to set the anything in the read stream options. Adding checkpoint
in read stream is redundant unless you have some special use case.
To answer your question, all you need to do is to set the checkpoint
in WRITE stream only. When you restart the whole query, the read stream will start at the checkpoint
that is saved in the sink table. It should also have no duplication as you're processing and saving data that haven't written in the sink table in your new-run query.
If you really need to set checkpoint
in the read stream in your use case, when you restart your query, based on the context you provided, Yes your 251 to 1000 records won't be written to your sink table because your read stream will not read the data between these offsets.
You don't need to do anything explicitly (in terms of coding) for restart.
You have 2 checkpoint directories as per standard guidelines and an 'append to sink' (the csv) with no idempotency and no WAL like for Delta format.
So, as per my understanding you could get duplicate records after a re-start, but may be not. But "exactly once" semantics do not apply. That would need to be achieved by having a Delta Sink or DB with upsert.