I am creating a straightforward application that load some configurations from MongoDB and start an Apache beam pipeline using dataflow runner like this:
func main(){
client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://mongodb:27017"))
if err != nil {
panic(err)
}
beam.Init()
beamPipeline := beam.NewPipeline()
scope := beamPipeline.Root()
collection := textio.Read(scope, "gs://mybucket/input.txt")
ctx := context.Background()
var transResult beam.PCollection
// run ParDo functions
textio.Write(scope, "gs://mybucket/output.txt", collection)
if err := beamx.Run(ctx, beamPipeline); err != nil {
panic(errMsg)
}
}
However, when I run this Apache beam pipeline using dataflow runner, the worker doesn't start because worker try run mongo db code:
PS: this code works using direct runner.
My questions are:
- Why is the dataflow runner attempting to run code outside pipeline scope?
- If dataflow worker run all code inside the main function, how create a custom application to run dataflow jobs? like start a job when a message come from pub/sub or load custom transformation options from mongodb... like this example
I am creating a straightforward application that load some configurations from MongoDB and start an Apache beam pipeline using dataflow runner like this:
func main(){
client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://mongodb:27017"))
if err != nil {
panic(err)
}
beam.Init()
beamPipeline := beam.NewPipeline()
scope := beamPipeline.Root()
collection := textio.Read(scope, "gs://mybucket/input.txt")
ctx := context.Background()
var transResult beam.PCollection
// run ParDo functions
textio.Write(scope, "gs://mybucket/output.txt", collection)
if err := beamx.Run(ctx, beamPipeline); err != nil {
panic(errMsg)
}
}
However, when I run this Apache beam pipeline using dataflow runner, the worker doesn't start because worker try run mongo db code:
PS: this code works using direct runner.
My questions are:
- Why is the dataflow runner attempting to run code outside pipeline scope?
- If dataflow worker run all code inside the main function, how create a custom application to run dataflow jobs? like start a job when a message come from pub/sub or load custom transformation options from mongodb... like this example
1 Answer
Reset to default 0For the "when is what code executed?" question:
Code before beam.Init
is run on all instances of the binary. It's not until beam.Init
is called that the binary knows it's in Worker or Main Program mode. So the reason why the "dataflow runner is running code outside of the pipeline" is because that's the code that was written.
This is why it "works" in the Go Direct runner (deprecated), or in the local Prism runner in Loopback mode: The binary is never restarted, so that code doesn't run again. If you don't want the mongo client to be called on workers, then, it needs to be after beam.Init
.
You can simulate docker mode by using the flag --environment_type=DOCKER
, which when using the default Prism Runner, will use Docker containers for the workers, instead of the local loopback mode. This will behave the same (as much as we can) as the workers on Dataflow.
For the 2nd question: How to write complex pipelines?
For one, this is a large, complicated topic. Please look at the Beam Programming Guide and set the language to Go for Go specific examples and advice. https://beam.apache./documentation/programming-guide/#creating-a-pipeline
Or look at the larger examples: https://pkg.go.dev/github/apache/beam/sdks/[email protected]/go/examples
Such as the large_wordcount, which demonstrates more complex DoFns and flags for configuration: https://github/apache/beam/blob/sdks/v2.61.0/sdks/go/examples/large_wordcount/large_wordcount.go#L16
Or look at how some of the built in transforms are implemented, such as the MongoDBIO. https://pkg.go.dev/github/apache/beam/sdks/[email protected]/go/pkg/beam/io/mongodbio#example-Read-Default