最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

apache flink - How is data handled by task workers from Kafka when using keyed partitioning? - Stack Overflow

programmeradmin3浏览0评论

I have a general idea of how it works but I just want to confirm what I've grokked from the docs is correct (particularly in a cluster).

  1. Let's assume there are 4 task manager instances.
  2. We use keyBy(userId)
  3. The Job Manager will receive the job, and distribute metadata to the 4 task managers such that each has its own respective key space that it is responsible for (key group). Say, 001-333, 334-555, 556-777, 778-999 when the total userId key space is 001-999. I understand the hashing and key grouping is handled behind the scenes - these groups are just for the sake of clarify.
  4. Each task manager pulls from Kafka directly.
  5. Task managers route keys to peer task managers that are responsible for that hash space.

Is this roughly correct? I'm a little confused as to how arbitrary keys can make it to their respective workers when each key must be handled by the same instance and slot for stateful handling.

Is the part that I'm missing here that sources, operators and sinks can scale independently? Meaning KafkaSource processes are responsible for hashing the key and routing it over to the correct downstream oeprator(s)? Local or network routed.

I guess I'm mentally modeling the function I deploy as if its a Lambda, which I guess it is not since the Job Manager is doing some magic behind the scenes to determine the job graph in the cluster?

I have a general idea of how it works but I just want to confirm what I've grokked from the docs is correct (particularly in a cluster).

  1. Let's assume there are 4 task manager instances.
  2. We use keyBy(userId)
  3. The Job Manager will receive the job, and distribute metadata to the 4 task managers such that each has its own respective key space that it is responsible for (key group). Say, 001-333, 334-555, 556-777, 778-999 when the total userId key space is 001-999. I understand the hashing and key grouping is handled behind the scenes - these groups are just for the sake of clarify.
  4. Each task manager pulls from Kafka directly.
  5. Task managers route keys to peer task managers that are responsible for that hash space.

Is this roughly correct? I'm a little confused as to how arbitrary keys can make it to their respective workers when each key must be handled by the same instance and slot for stateful handling.

Is the part that I'm missing here that sources, operators and sinks can scale independently? Meaning KafkaSource processes are responsible for hashing the key and routing it over to the correct downstream oeprator(s)? Local or network routed.

I guess I'm mentally modeling the function I deploy as if its a Lambda, which I guess it is not since the Job Manager is doing some magic behind the scenes to determine the job graph in the cluster?

Share Improve this question edited Feb 15 at 13:52 Kris asked Feb 15 at 13:11 KrisKris 10.3k6 gold badges31 silver badges61 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

The task managers each know the following information:

  • the key selector function that gets used in keyBy -- so they are each capable of independently computing the key for each record (note that the key function must be deterministic for this to work)
  • the total number of task managers, and their individual task manager index (e.g., "The job manager told me I am task manager #7 out of 42.")
  • the number of key groups to use (also known as the maximum parallelism) -- this defaults to 128, and is often raised to 32768 (the max possible value)
  • how to hash a key to compute which key group the key belongs to
  • how to compute, for each key group, which task manager is responsible for that key group

When a job is (re)started, each task manager uses this information to independently (and consistently) compute the range of key groups it will be handling. Checkpoints are indexed by key group, so each task manager can efficiently fetch and load their slice of the checkpoint.

Network shuffles (keyBy) are done by having each TM compute, for each record, first the key, then the key group, and then the index of the TM responsible for that key group.

Kafka partitioning is a completely separate concern -- Flink does not try to align its keying or partitioning (into key groups) to what Kafka is doing. Each instance of the Kafka source is aware of its task index and the total number of Kafka source tasks, as well as the set of topics and the number of partitions in each topic. They end up with a round-robin distribution of the partitions to the Kafka source tasks, with each instance independently computing which Kafka partitions it should be handling.

I hope that was clear. I think you had it basically right, but I tried to help by making it more concrete.

发布评论

评论列表(0)

  1. 暂无评论