最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

go - Apache beam - custom golang dataflow application stuck - Stack Overflow

programmeradmin2浏览0评论

I am creating a straightforward application that load some configurations from MongoDB and start an Apache beam pipeline using dataflow runner like this:

func main(){
    client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://mongodb:27017"))
    if err != nil {
        panic(err)
    }

    beam.Init()
    beamPipeline := beam.NewPipeline()
    scope := beamPipeline.Root()

    collection := textio.Read(scope, "gs://mybucket/input.txt")

    ctx := context.Background()
    var transResult beam.PCollection
    // run ParDo functions
    textio.Write(scope, "gs://mybucket/output.txt", collection)

    if err := beamx.Run(ctx, beamPipeline); err != nil {
        panic(errMsg)
    }
}

However, when I run this Apache beam pipeline using dataflow runner, the worker doesn't start because worker try run mongo db code:

PS: this code works using direct runner.

My questions are:

  • Why is the dataflow runner attempting to run code outside pipeline scope?
  • If dataflow worker run all code inside the main function, how create a custom application to run dataflow jobs? like start a job when a message come from pub/sub or load custom transformation options from mongodb... like this example

I am creating a straightforward application that load some configurations from MongoDB and start an Apache beam pipeline using dataflow runner like this:

func main(){
    client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://mongodb:27017"))
    if err != nil {
        panic(err)
    }

    beam.Init()
    beamPipeline := beam.NewPipeline()
    scope := beamPipeline.Root()

    collection := textio.Read(scope, "gs://mybucket/input.txt")

    ctx := context.Background()
    var transResult beam.PCollection
    // run ParDo functions
    textio.Write(scope, "gs://mybucket/output.txt", collection)

    if err := beamx.Run(ctx, beamPipeline); err != nil {
        panic(errMsg)
    }
}

However, when I run this Apache beam pipeline using dataflow runner, the worker doesn't start because worker try run mongo db code:

PS: this code works using direct runner.

My questions are:

  • Why is the dataflow runner attempting to run code outside pipeline scope?
  • If dataflow worker run all code inside the main function, how create a custom application to run dataflow jobs? like start a job when a message come from pub/sub or load custom transformation options from mongodb... like this example
Share Improve this question edited Feb 6 at 8:43 marc_s 756k184 gold badges1.4k silver badges1.5k bronze badges asked Dec 23, 2024 at 12:26 LeoCBSLeoCBS 3013 silver badges13 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

For the "when is what code executed?" question:

Code before beam.Init is run on all instances of the binary. It's not until beam.Init is called that the binary knows it's in Worker or Main Program mode. So the reason why the "dataflow runner is running code outside of the pipeline" is because that's the code that was written.

This is why it "works" in the Go Direct runner (deprecated), or in the local Prism runner in Loopback mode: The binary is never restarted, so that code doesn't run again. If you don't want the mongo client to be called on workers, then, it needs to be after beam.Init.

You can simulate docker mode by using the flag --environment_type=DOCKER, which when using the default Prism Runner, will use Docker containers for the workers, instead of the local loopback mode. This will behave the same (as much as we can) as the workers on Dataflow.

For the 2nd question: How to write complex pipelines?

For one, this is a large, complicated topic. Please look at the Beam Programming Guide and set the language to Go for Go specific examples and advice. https://beam.apache./documentation/programming-guide/#creating-a-pipeline

Or look at the larger examples: https://pkg.go.dev/github/apache/beam/sdks/[email protected]/go/examples

Such as the large_wordcount, which demonstrates more complex DoFns and flags for configuration: https://github/apache/beam/blob/sdks/v2.61.0/sdks/go/examples/large_wordcount/large_wordcount.go#L16

Or look at how some of the built in transforms are implemented, such as the MongoDBIO. https://pkg.go.dev/github/apache/beam/sdks/[email protected]/go/pkg/beam/io/mongodbio#example-Read-Default

发布评论

评论列表(0)

  1. 暂无评论