I have a general idea of how it works but I just want to confirm what I've grokked from the docs is correct (particularly in a cluster).
- Let's assume there are
4
task manager instances. - We use
keyBy(userId)
- The Job Manager will receive the job, and distribute metadata to the
4
task managers such that each has its own respective key space that it is responsible for (key group). Say,001-333
,334-555
,556-777
,778-999
when the totaluserId
key space is001-999
. I understand the hashing and key grouping is handled behind the scenes - these groups are just for the sake of clarify. - Each task manager pulls from Kafka directly.
- Task managers route keys to peer task managers that are responsible for that hash space.
Is this roughly correct? I'm a little confused as to how arbitrary keys can make it to their respective workers when each key must be handled by the same instance and slot for stateful handling.
Is the part that I'm missing here that sources, operators and sinks can scale independently? Meaning KafkaSource processes are responsible for hashing the key and routing it over to the correct downstream oeprator(s)? Local or network routed.
I guess I'm mentally modeling the function I deploy as if its a Lambda, which I guess it is not since the Job Manager is doing some magic behind the scenes to determine the job graph in the cluster?
I have a general idea of how it works but I just want to confirm what I've grokked from the docs is correct (particularly in a cluster).
- Let's assume there are
4
task manager instances. - We use
keyBy(userId)
- The Job Manager will receive the job, and distribute metadata to the
4
task managers such that each has its own respective key space that it is responsible for (key group). Say,001-333
,334-555
,556-777
,778-999
when the totaluserId
key space is001-999
. I understand the hashing and key grouping is handled behind the scenes - these groups are just for the sake of clarify. - Each task manager pulls from Kafka directly.
- Task managers route keys to peer task managers that are responsible for that hash space.
Is this roughly correct? I'm a little confused as to how arbitrary keys can make it to their respective workers when each key must be handled by the same instance and slot for stateful handling.
Is the part that I'm missing here that sources, operators and sinks can scale independently? Meaning KafkaSource processes are responsible for hashing the key and routing it over to the correct downstream oeprator(s)? Local or network routed.
I guess I'm mentally modeling the function I deploy as if its a Lambda, which I guess it is not since the Job Manager is doing some magic behind the scenes to determine the job graph in the cluster?
Share Improve this question edited Feb 15 at 13:52 Kris asked Feb 15 at 13:11 KrisKris 10.3k6 gold badges31 silver badges61 bronze badges1 Answer
Reset to default 1The task managers each know the following information:
- the key selector function that gets used in keyBy -- so they are each capable of independently computing the key for each record (note that the key function must be deterministic for this to work)
- the total number of task managers, and their individual task manager index (e.g., "The job manager told me I am task manager #7 out of 42.")
- the number of key groups to use (also known as the maximum parallelism) -- this defaults to 128, and is often raised to 32768 (the max possible value)
- how to hash a key to compute which key group the key belongs to
- how to compute, for each key group, which task manager is responsible for that key group
When a job is (re)started, each task manager uses this information to independently (and consistently) compute the range of key groups it will be handling. Checkpoints are indexed by key group, so each task manager can efficiently fetch and load their slice of the checkpoint.
Network shuffles (keyBy) are done by having each TM compute, for each record, first the key, then the key group, and then the index of the TM responsible for that key group.
Kafka partitioning is a completely separate concern -- Flink does not try to align its keying or partitioning (into key groups) to what Kafka is doing. Each instance of the Kafka source is aware of its task index and the total number of Kafka source tasks, as well as the set of topics and the number of partitions in each topic. They end up with a round-robin distribution of the partitions to the Kafka source tasks, with each instance independently computing which Kafka partitions it should be handling.
I hope that was clear. I think you had it basically right, but I tried to help by making it more concrete.