Apache Beam Java sdk version: 2.63.0 Spark version: 3.5.4
I'm running ParDo on a CoGrouped PCollection and expecting to retrieve multiple outputs using withOutputTags.
When looking at Spark DAG Visualization, I see that that the ParDo step is invoked multiple times (1 per each output).
I'm wondering if ParDo on CoGbkResult with multiple outputs can run with a single pass.
Couldn't find Any ref for it online nor on beam's github.
Code:
PCollection<KV<Long, CustomObject1>> collection1 = ...
PCollection<KV<Long, CustomObject2>> collection2 = ...
PCollection<KV<Long, CoGbkResult>> grouped = KeyedPCollectionTuple
.of(TAG1, collection1)
.and(TAG2, collection2)
.apply("CoGroupById", CoGroupByKey.create());
PCollectionTuple outputDatasets = grouped.apply(ParDo.of(new DoFn<KV<Long, CoGbkResult>, KV<Long, OutputObject>>() {
@ProcessElement
public void processElement(@Element KV<Long, CoGbkResult> element, ProcessContext c) {
Result result = custom_method();
if (result.getScore() < 0)
context.output(KV.of(obj.getId(), obj));
else
context.output(TAG_B, KV.of(obj.getId(), obj));
// TAG_ADDITIONAL expects different object
context.output(TAG_ADDITIONAL, result.getAdditional() ));
}
}).withOutputTags(TAG_A, TupleTagList.of(TAG_B, TAG_ADDITIONAL)));
// Processing TAG_A, TAG_B, TAG_ADDITIONAL and write them to the disc.
Tried to change the way the output is written to the disc from writeDynamic to simple TextIO.write() but it didn't help.
Under Accumulators, Beam.Metrics imply that my custom code ran multiple times.
Event timeline implies the same things (separate jobs that took ~same amount of time).
Am I missing something?