DEV Community

Cover image for Go concurrency and sychronization - Part 3: Data pipelines
David Kröll
David Kröll

Posted on

Go concurrency and sychronization - Part 3: Data pipelines

As already pointed out in the last part, I am going to refactor the notifiers and synchronization already created. Here is again the current state painted in a simple graphic (the same as in the first part):

Goroutine architectural workflow

We have two goroutines which notify each other when they are done. In the last part I achieved a pretty good state where I noticed afterwards that there is the same code twice in my program. So it is again time to refactor it.

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
    "time"
)

func main() {
    printOdd := make(chan struct{})
    printEven := make(chan struct{})

    wg := &sync.WaitGroup{}

    go func() {
        wg.Add(1)

        // move the Done() call into a deferred function
        // and now we are able to recover from a possible panic, as above
        defer func() {
            wg.Done()
            if err := recover(); err != nil {
                fmt.Println("send on closed channel occured, but recoverd")
            }
        }()

        start := 0

        for range printEven {
            fmt.Println(start)
            start = start + 2
            time.Sleep(time.Second)
            printOdd <- struct{}{}
        }
    }()

    go func() {
        wg.Add(1)

        // same as above
        defer func() {
            wg.Done()
            if err := recover(); err != nil {
                fmt.Println("send on closed channel occured, but recoverd")
            }
        }()

        start := 1

        for range printOdd {
            fmt.Println(start)
            start = start + 2
            time.Sleep(time.Second)
            printEven <- struct{}{}
        }
    }()

    reader := bufio.NewReader(os.Stdin)
    fmt.Println("Press enter to cancel")
    fmt.Println("---------------------")
    // trigger the ping-pong
    printEven <- struct{}{}

    reader.ReadString('\n')
    fmt.Println("finished")

    close(printEven)
    close(printOdd)
    wg.Wait()
}
Enter fullscreen mode Exit fullscreen mode

So the above code is going to be refactored. The two goroutines dispatched in the main function are going to be moved out.

// the two anonymous funcs are now the same
// chIn may only receive and chOut may only send
func concurrenCounter(start int, chIn <-chan struct{}, chOut chan<- struct{}, wg *sync.WaitGroup) {
    wg.Add(1)

    defer func() {
        wg.Done()
        if err := recover(); err != nil {
            fmt.Println("send on closed channel occured, but recoverd")
        }
    }()

    for range chIn {
        fmt.Println(start)
        start = start + 2
        time.Sleep(time.Second)
        chOut <- struct{}{}
    }
}

func main() {
    printOdd := make(chan struct{})
    printEven := make(chan struct{})

    wg := &sync.WaitGroup{}

    // call it with correctly arranged input and output channels
    go concurrenCounter(0, printEven, printOdd, wg)
    go concurrenCounter(1, printOdd, printEven, wg)

    reader := bufio.NewReader(os.Stdin)
    fmt.Println("Press enter to cancel")
    fmt.Println("---------------------")
    // trigger the ping-pong
    printEven <- struct{}{}

    reader.ReadString('\n')
    fmt.Println("finished")

    close(printEven)
    close(printOdd)
    wg.Wait()
}
Enter fullscreen mode Exit fullscreen mode

I also utilized the special channel types, which only allow either receiving from it or sending to it.
It is called the channel direction. This will increase the programs safety, since a send or receive to the wrong channel is no longer possible

I again used the sync.WaitGroup and passed it into the function.

Data pipelines

Currently, every goroutine prints the computed numbers to the command line. I'd like to split this up into an additional step.
My desired software architecture should look like a modern factory. After a step has finished, the data should be passed to the next step. Think of an conveyor belt (the channel) where data is passed onto the next worker (the goroutine).
The computed numbers should flow between the individual goroutines.

Data pipelining

So the single steps in our modern factory are the goroutines and the conveyor belts are the channels. They are all dispatched from the main goroutine. The data flows from the left side in the graphic to the right side.

The goroutines A and B are doing the same computation, but not printing it to the console. They will just send it over to goroutine C and are therefore no longer responsible for the data.

The goroutine C will print the numbers unified to the console.

// also created a custom type wich is just an empty struct with another name
type token struct{}

// Goroutines A and B
func concurrenCounter(start int, ping <-chan token, pong chan<- token, outCh chan<- int, wg *sync.WaitGroup) {
    wg.Add(1)

    defer func() {
        wg.Done()
        if err := recover(); err != nil {
            fmt.Println("send on closed channel occured, but recoverd")
        }
    }()

    for range ping {
        outCh <- start
        start = start + 2
        time.Sleep(time.Second)
        pong <- token{}
    }
}

// Goroutine C
func collectAndPrint(inCh <-chan int, wg *sync.WaitGroup) {
    wg.Add(1)

    // we do not need to recover from panic here
    defer wg.Done()

    for i := range inCh {
        fmt.Printf("%d\n", i)
    }
}

func main() {
    calculateOdd := make(chan token)
    calculateEven := make(chan token)

    collector := make(chan int)

    wg := &sync.WaitGroup{}

    // call it with correctly arranged input and output channels
    go concurrenCounter(0, calculateEven, calculateOdd, collector, wg)
    go concurrenCounter(1, calculateOdd, calculateEven, collector, wg)

    // spin up collecting goroutine
    go collectAndPrint(collector, wg)

    reader := bufio.NewReader(os.Stdin)
    fmt.Println("Press enter to cancel")
    fmt.Println("---------------------")
    // trigger the ping-pong
    calculateEven <- token{}

    reader.ReadString('\n')
    fmt.Println("finished")

    close(calculateEven)
    close(calculateOdd)
    close(collector)

    wg.Wait()
}
Enter fullscreen mode Exit fullscreen mode

I also created a custom type for the ping-pong the two goroutines A and B do - it's just for readability. It now ended in a totally overengineered counter example but I hope you got the point. Creating data pipelines in Go is very easy and I totally love it. It feels just right to utilize the language features the creators of Go gave us. The concurrency system is really awesome and simple to use.

I hope you enjoyed my series and learned something about concurrency and synchronization in Go. As always, feel free to discuss about improvements.

Top comments (0)