te')); return $arr; } /* 遍历用户所有主题 * @param $uid 用户ID * @param int $page 页数 * @param int $pagesize 每页记录条数 * @param bool $desc 排序方式 TRUE降序 FALSE升序 * @param string $key 返回的数组用那一列的值作为 key * @param array $col 查询哪些列 */ function thread_tid_find_by_uid($uid, $page = 1, $pagesize = 1000, $desc = TRUE, $key = 'tid', $col = array()) { if (empty($uid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('uid' => $uid), array('tid' => $orderby), $page, $pagesize, $key, $col); return $arr; } // 遍历栏目下tid 支持数组 $fid = array(1,2,3) function thread_tid_find_by_fid($fid, $page = 1, $pagesize = 1000, $desc = TRUE) { if (empty($fid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('fid' => $fid), array('tid' => $orderby), $page, $pagesize, 'tid', array('tid', 'verify_date')); return $arr; } function thread_tid_delete($tid) { if (empty($tid)) return FALSE; $r = thread_tid__delete(array('tid' => $tid)); return $r; } function thread_tid_count() { $n = thread_tid__count(); return $n; } // 统计用户主题数 大数量下严谨使用非主键统计 function thread_uid_count($uid) { $n = thread_tid__count(array('uid' => $uid)); return $n; } // 统计栏目主题数 大数量下严谨使用非主键统计 function thread_fid_count($fid) { $n = thread_tid__count(array('fid' => $fid)); return $n; } ?>Flink Async IO & Checkpointing - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Flink Async IO & Checkpointing - Stack Overflow

programmeradmin2浏览0评论

I am building a pipeline which reads events from Kafka and calls a bunch of REST APIs in a chained fashion. For calling the APIs I am using (unordered) AsyncIO with a FixedDelayRetryStrategy.

One of my wishes for this system is to be resilient to API failures, and not to lose events. What I observe from testing though, is that Flink's checkpoints are stuck 'IN_PROGRESS' when the capacity of the AsyncIO operator is saturated.

I have built the following test:

  • a DataGenerator source, that emits a random Integer every 200ms
  • a RichAsyncFunction (ResultFuture) that completes successfully when the event is even, and exceptionally when not even.

In the Flink UI, my pipelines shows as 2 operators:

  • Source: Collection Source -> (Map, Sink: Print to Std. Out)
  • async wait operator -> Map -> Sink: Print to Std. Out

To make it retry (simulating that an API would be down for 30 minutes), I have set:

  • a capacity of 20
  • a timeout on the AsyncDataStream of 30 minutes
  • a backOff interval of 5 seconds (the event completed exceptionally will be retried every 5 sec)
  • a maxAttempts > than 30minutes/5 seconds (so that it doesn't fail the task)
  • a timeout() method that will do a ResultFutureplete(event), after the timeout interval, allowing me to put the event into a DLQ for example

I have tried both aligned and unaligned checkpoints.

What I observe is that checkpointing doesn't advance once the RichAsyncFunction capacity is saturated with events being retried. The source keeps producing events, but the checkpoint is stuck with 'IN_PROGRESS'. Furthermore, if a checkpoint timeout happens, the state is lost (all the events produced by the source), probably it's restarting the Job from the previous checkpoint or even savepoint (by default).

What I don't understand is why are checkpoints stuck. Why can't the RichAsyncFunction just save the current state in a checkpoint (I'm assuming this is what is making the checkpoint stuck, maybe it has to do with the Timers that the retry mechanism is using, I don't see why the source would not be able to do it, even if it produces events faster than they can be processed).

Am I misusing or misunderstanding AsyncIO retries? Can I make checkpoints happen on other parts of the pipeline, or can I make it 'pause' consuming from Kafka automatically when such a block takes place?

I am building a pipeline which reads events from Kafka and calls a bunch of REST APIs in a chained fashion. For calling the APIs I am using (unordered) AsyncIO with a FixedDelayRetryStrategy.

One of my wishes for this system is to be resilient to API failures, and not to lose events. What I observe from testing though, is that Flink's checkpoints are stuck 'IN_PROGRESS' when the capacity of the AsyncIO operator is saturated.

I have built the following test:

  • a DataGenerator source, that emits a random Integer every 200ms
  • a RichAsyncFunction (ResultFuture) that completes successfully when the event is even, and exceptionally when not even.

In the Flink UI, my pipelines shows as 2 operators:

  • Source: Collection Source -> (Map, Sink: Print to Std. Out)
  • async wait operator -> Map -> Sink: Print to Std. Out

To make it retry (simulating that an API would be down for 30 minutes), I have set:

  • a capacity of 20
  • a timeout on the AsyncDataStream of 30 minutes
  • a backOff interval of 5 seconds (the event completed exceptionally will be retried every 5 sec)
  • a maxAttempts > than 30minutes/5 seconds (so that it doesn't fail the task)
  • a timeout() method that will do a ResultFutureplete(event), after the timeout interval, allowing me to put the event into a DLQ for example

I have tried both aligned and unaligned checkpoints.

What I observe is that checkpointing doesn't advance once the RichAsyncFunction capacity is saturated with events being retried. The source keeps producing events, but the checkpoint is stuck with 'IN_PROGRESS'. Furthermore, if a checkpoint timeout happens, the state is lost (all the events produced by the source), probably it's restarting the Job from the previous checkpoint or even savepoint (by default).

What I don't understand is why are checkpoints stuck. Why can't the RichAsyncFunction just save the current state in a checkpoint (I'm assuming this is what is making the checkpoint stuck, maybe it has to do with the Timers that the retry mechanism is using, I don't see why the source would not be able to do it, even if it produces events faster than they can be processed).

Am I misusing or misunderstanding AsyncIO retries? Can I make checkpoints happen on other parts of the pipeline, or can I make it 'pause' consuming from Kafka automatically when such a block takes place?

Share Improve this question asked Feb 17 at 19:47 CraigCraig 11 bronze badge New contributor Craig is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
Add a comment  | 

1 Answer 1

Reset to default 0

If your RichAsyncFunction isn't processing events (due to retrying on errors) then checkpointing will also be stuck, as the checkpoint "barrier" won't skip over records that are in the input buffer for the operator.

See this document on unaligned checkpoints as one way to work around this issue.

发布评论

评论列表(0)

  1. 暂无评论