最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

go - How to build a circular pipeline? - Stack Overflow

programmeradmin0浏览0评论

Let's say I have a function F with the following logic:

  • a <- input
  • output <- a+10
  • b <- input
  • output a+b
  • stop

Now I want to build two blocks (A and B) that implement this logic, each feeding their output as the input of the other block. I want to feed a number (1 for instance) to the first block A, run this circular pipeline, and get the output of the block B, how can I do such a thing?

The solution should run like this:

  • A receives 1, stores it in a
  • A outputs 11
  • B receives 11, stores it in a
  • B outputs 21
  • A receives 21, stores it in b
  • A outputs a + b = 1 + 21 = 22
  • B receives 22, stores it in b
  • B outputs a + b = 11 + 22 = 33

I want to get the final output, which is 33 in this example.

Here's what I've tried so far:

package main

import (
    "fmt"
    "sync"
)

func my_function(in, out chan int) {
    go func() {
        a := <-in
        out <- a + 10
        b := <-in
        out <- a + b
        close(out)
    }()

}

func main() {

    input_a := make(chan int, 2)
    output_a := make(chan int, 2)

    input_a <- 1
    var wg sync.WaitGroup
    wg.Add(2)

    go my_function(input_a, output_a)
    go my_function(output_a, input_a)

    go func() {
        wg.Wait()
        close(input_a)
    }()

    for x := range input_a {
        fmt.Println("Result:", x)
    }
}

But it deadlocks...

And can I chain more than two blocks? I would like to create a circular pipeline of 5 blocks, getting the output of the last one.

I'm doing this to solve an Advent Of Code problem (2019, day 7), if you have better ideas/pattern for this day, please tell me!

Let's say I have a function F with the following logic:

  • a <- input
  • output <- a+10
  • b <- input
  • output a+b
  • stop

Now I want to build two blocks (A and B) that implement this logic, each feeding their output as the input of the other block. I want to feed a number (1 for instance) to the first block A, run this circular pipeline, and get the output of the block B, how can I do such a thing?

The solution should run like this:

  • A receives 1, stores it in a
  • A outputs 11
  • B receives 11, stores it in a
  • B outputs 21
  • A receives 21, stores it in b
  • A outputs a + b = 1 + 21 = 22
  • B receives 22, stores it in b
  • B outputs a + b = 11 + 22 = 33

I want to get the final output, which is 33 in this example.

Here's what I've tried so far:

package main

import (
    "fmt"
    "sync"
)

func my_function(in, out chan int) {
    go func() {
        a := <-in
        out <- a + 10
        b := <-in
        out <- a + b
        close(out)
    }()

}

func main() {

    input_a := make(chan int, 2)
    output_a := make(chan int, 2)

    input_a <- 1
    var wg sync.WaitGroup
    wg.Add(2)

    go my_function(input_a, output_a)
    go my_function(output_a, input_a)

    go func() {
        wg.Wait()
        close(input_a)
    }()

    for x := range input_a {
        fmt.Println("Result:", x)
    }
}

But it deadlocks...

And can I chain more than two blocks? I would like to create a circular pipeline of 5 blocks, getting the output of the last one.

I'm doing this to solve an Advent Of Code problem (2019, day 7), if you have better ideas/pattern for this day, please tell me!

Share Improve this question edited Mar 11 at 14:51 DarkBee 15.5k8 gold badges72 silver badges118 bronze badges asked Mar 11 at 14:50 Be Chiller TooBe Chiller Too 2,9383 gold badges23 silver badges50 bronze badges 2
  • You only put 1 value in input_a, but it looks like you'll try to read 4 between your two calls to my_function. I also see a WaitGroup but nothing calls Done, so your wg.Wait call will block forever. – Stephen Newell Commented Mar 11 at 15:12
  • thanks for your comment @StephenNewell! I only put 1 starting value in input_a, to start the "process", because the 2nd goroutine should output a value (21, here) in input_a for the 1st goroutine to read. For the WaitGroup, I'll try to modify my code. – Be Chiller Too Commented Mar 11 at 15:34
Add a comment  | 

2 Answers 2

Reset to default 0

There are two problems here. Excessive coroutine nesting leads to the inability to ensure the timing sequence. There is no place where wg calls done, which results in the inability to close input_a, posing a risk. According to the rules, the range method of a channel is only applicable to a closed channel. Otherwise, it will cause blocking and may even lead to a deadlock. The following is my solution, which you can refer to.


package main

import (
    "fmt"
    "sync"
)


/**
*   A receives 1, stores it in a
    A outputs 11
    B receives 11, stores it in a
    B outputs 21
    A receives 21, stores it in b
    A outputs a + b = 1 + 21 = 22
    B receives 22, stores it in b
    B outputs a + b = 11 + 22 = 33
*
*/

func my_function(in, out chan int) {

    select {
    case a := <-in:
        fmt.Printf("%d := <-in \n", a)

        fmt.Printf("out <- %d \n", a+10)
        out <- a + 10

        select {
        case b := <-in:
            fmt.Printf("%d := <-in \n", b)

            fmt.Printf("out <- %d \n", a+b)
            out <- a + b
            return
        }

    }

}

func my_function2(out, in chan int) {
    select {
    case a := <-out:
        fmt.Printf("%d := <-out \n", a)

        fmt.Printf("in <- %d \n", a+10)
        in <- a + 10

        select {
        case b := <-out:
            fmt.Printf("%d := <-out \n", b)

            fmt.Printf("in <- %d \n", a+b)
            in <- a + b
            return
        }
    }

}

func main() {

    input_a := make(chan int, 2)
    output_a := make(chan int, 2)

    input_a <- 1
    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        my_function(input_a, output_a)
    }()
    go func() {
        defer wg.Done()
        my_function2(output_a, input_a)
    }()

    wg.Wait()
    close(input_a)

    for x := range input_a {
        fmt.Println("Result:", x)
    }
}

then you will get :

I found a way to chain 3 or more blocks, with a function logic that can take an arbitrary number of inputs:

package main

import (
    "fmt"
    "sync"
)

func my_function(in, out chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := range 3 { // This function can take an arbitrary number of inputs
        a := <-in
        result := a + i
        fmt.Printf("a=%d, i=%d, result=%d\n", a, i, result)
        out <- result
    }
}

func main() {
    input_a := make(chan int, 2)
    input_b := make(chan int, 2)
    input_c := make(chan int, 2)

    input_a <- 1 // First input to kickstart the process
    var wg sync.WaitGroup
    wg.Add(3)

    go my_function(input_a, input_b, &wg) // A -> B
    go my_function(input_b, input_c, &wg) // B -> C
    go my_function(input_c, input_a, &wg) // C goes back to A

    wg.Wait()
    close(input_a)

    for x := range input_a {
        fmt.Println("Result:", x)
    }
}
发布评论

评论列表(0)

  1. 暂无评论