I am building a pipeline which reads events from Kafka and calls a bunch of REST APIs in a chained fashion. For calling the APIs I am using (unordered) AsyncIO with a FixedDelayRetryStrategy.
One of my wishes for this system is to be resilient to API failures, and not to lose events. What I observe from testing though, is that Flink's checkpoints are stuck 'IN_PROGRESS' when the capacity of the AsyncIO operator is saturated.
I have built the following test:
- a DataGenerator source, that emits a random Integer every 200ms
- a RichAsyncFunction (ResultFuture) that completes successfully when the event is even, and exceptionally when not even.
In the Flink UI, my pipelines shows as 2 operators:
- Source: Collection Source -> (Map, Sink: Print to Std. Out)
- async wait operator -> Map -> Sink: Print to Std. Out
To make it retry (simulating that an API would be down for 30 minutes), I have set:
- a capacity of 20
- a timeout on the AsyncDataStream of 30 minutes
- a backOff interval of 5 seconds (the event completed exceptionally will be retried every 5 sec)
- a maxAttempts > than 30minutes/5 seconds (so that it doesn't fail the task)
- a timeout() method that will do a ResultFutureplete(event), after the timeout interval, allowing me to put the event into a DLQ for example
I have tried both aligned and unaligned checkpoints.
What I observe is that checkpointing doesn't advance once the RichAsyncFunction capacity is saturated with events being retried. The source keeps producing events, but the checkpoint is stuck with 'IN_PROGRESS'. Furthermore, if a checkpoint timeout happens, the state is lost (all the events produced by the source), probably it's restarting the Job from the previous checkpoint or even savepoint (by default).
What I don't understand is why are checkpoints stuck. Why can't the RichAsyncFunction just save the current state in a checkpoint (I'm assuming this is what is making the checkpoint stuck, maybe it has to do with the Timers that the retry mechanism is using, I don't see why the source would not be able to do it, even if it produces events faster than they can be processed).
Am I misusing or misunderstanding AsyncIO retries? Can I make checkpoints happen on other parts of the pipeline, or can I make it 'pause' consuming from Kafka automatically when such a block takes place?
I am building a pipeline which reads events from Kafka and calls a bunch of REST APIs in a chained fashion. For calling the APIs I am using (unordered) AsyncIO with a FixedDelayRetryStrategy.
One of my wishes for this system is to be resilient to API failures, and not to lose events. What I observe from testing though, is that Flink's checkpoints are stuck 'IN_PROGRESS' when the capacity of the AsyncIO operator is saturated.
I have built the following test:
- a DataGenerator source, that emits a random Integer every 200ms
- a RichAsyncFunction (ResultFuture) that completes successfully when the event is even, and exceptionally when not even.
In the Flink UI, my pipelines shows as 2 operators:
- Source: Collection Source -> (Map, Sink: Print to Std. Out)
- async wait operator -> Map -> Sink: Print to Std. Out
To make it retry (simulating that an API would be down for 30 minutes), I have set:
- a capacity of 20
- a timeout on the AsyncDataStream of 30 minutes
- a backOff interval of 5 seconds (the event completed exceptionally will be retried every 5 sec)
- a maxAttempts > than 30minutes/5 seconds (so that it doesn't fail the task)
- a timeout() method that will do a ResultFutureplete(event), after the timeout interval, allowing me to put the event into a DLQ for example
I have tried both aligned and unaligned checkpoints.
What I observe is that checkpointing doesn't advance once the RichAsyncFunction capacity is saturated with events being retried. The source keeps producing events, but the checkpoint is stuck with 'IN_PROGRESS'. Furthermore, if a checkpoint timeout happens, the state is lost (all the events produced by the source), probably it's restarting the Job from the previous checkpoint or even savepoint (by default).
What I don't understand is why are checkpoints stuck. Why can't the RichAsyncFunction just save the current state in a checkpoint (I'm assuming this is what is making the checkpoint stuck, maybe it has to do with the Timers that the retry mechanism is using, I don't see why the source would not be able to do it, even if it produces events faster than they can be processed).
Am I misusing or misunderstanding AsyncIO retries? Can I make checkpoints happen on other parts of the pipeline, or can I make it 'pause' consuming from Kafka automatically when such a block takes place?
Share Improve this question asked Feb 17 at 19:47 CraigCraig 11 bronze badge New contributor Craig is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.1 Answer
Reset to default 0If your RichAsyncFunction
isn't processing events (due to retrying on errors) then checkpointing will also be stuck, as the checkpoint "barrier" won't skip over records that are in the input buffer for the operator.
See this document on unaligned checkpoints as one way to work around this issue.