DEV Community

loading...

Golang Tutorial - 10 Concurrency with channel

nadirbasalamah profile image nadirbasalamah ・9 min read

Channel in Go

Create a concurrent program in Go has a unique approach. The approach is share memory by communicating instead communicate by sharing memory which is not allowed in Go. This approach can be done using channel that can be used by goroutines to communicate each other.

To create a channel in Go, can be done by using make() function.

//create a channel of type int
//make(chan <data type>)
c := make(chan int)

Notice that when creating a channel there is certain data type that need to be attached. This type can be any type that supported by Golang.

To attach a value into channel and retrieve a value from channel can be done using this syntax.

//attach or send a value into channel
c <- 43
//retrieve a value from channel
<-c

Here it is the simple example of using channel:

func main() {
    //create a channel
    c := make(chan int)

    //put a value into channel
    c <- 43

    //retrieve a value from channel
    fmt.Println("value of channel c:", <-c)
}

Output:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:

Based on the output, the code has a error because all goroutines are asleep that creates a deadlock condition. Deadlock condition occurs when set of processes are blocked because each process has a resource and waiting for another resource retrieved by other process. In this code, the process of put a value into channel or send a value into channel is blocking other process (in this case retrieve a value from channel) so the deadlock occurs.

To avoid from deadlock, the send operation into channel can be wrapped into goroutine.

func main() {
    //create a channel
    c := make(chan <-int)

    //put a value into channel
    //wrapped inside goroutine
    go func() {
        c <- 43
    }()

    //retrieve a value from channel
    fmt.Println("value of channel c:", <-c)
}

Output:

value of channel c: 43

When creating a channel, the mechanism can be specified to enable only send value into channel or only retrieve a value from channel.

//create a channel that only available for reading value 
cread := make(<-chan int)
//create a channel that only available for sending value
csend := make(chan<- int)

The capacity of channel can be specified as well. This called buffered channel

//create a channel that available for 25 data that has a type of int
c := make(chan int, 25)

select and close()

When creating a channel, the close() function can be called to close a channel when sending operation to a channel is finished. Here it is the example.

func main() {
    //create a channel with capacity of 10 ints
    c := make(chan int, 10)

    //put a value into channel
    //wrapped inside goroutine
    go func() {
        //send value from 1-10 into channel
        for i := 1; i <= 10; i++ {
            c <- i
        }
        //close the channel
        //this means that the sending operation is finished
        close(c)
    }()

    //retrieve a value from channel using for loop
    for v := range c {
        fmt.Println("Value from c:", v)
    }

    fmt.Println("Bye..")
}

Output:

Value from c: 1
Value from c: 2
Value from c: 3
Value from c: 4
Value from c: 5
Value from c: 6
Value from c: 7
Value from c: 8
Value from c: 9
Value from c: 10
Bye..

When receiving some values from channel with certain condition or from certain channel, the select {..} can be used to determine which channel's value needs to be received. Here it is the example of using select{..}.

func main() {
    //create a channels
    frontend := make(chan string)
    backend := make(chan string)
    quit := make(chan string)

    //send some values to channels with goroutine
    go send(frontend, backend, quit)

    //receive some values from channels
    receive(frontend, backend, quit)
}

func send(f, b, q chan<- string) {
    data := []string{"React", "NodeJS", "Vue", "Flask", "Angular", "Laravel"}
    for i := 0; i < len(data); i++ {
        if i%2 == 0 {
            //send value to channel f
            f <- data[i]
        } else {
            //send value to channel b
            b <- data[i]
        }
    }
    //send value to channel q
    q <- "finished"
}

func receive(f, b, q <-chan string) {
    for {
        //using select to choose certain channel
        // that the value need to be received
        select {
        //if the value comes from channel called "f"
        //then execute the code
        case v := <-f:
            fmt.Println("Front End Dev:", v)
        //if the value comes from channel called "b"
        //then execute the code
        case v := <-b:
            fmt.Println("Back End Dev:", v)
        //if the value comes from channel called "q"
        //then execute the code
        case v := <-q:
            fmt.Println("This program is", v)
            return //finish the execution
        }
    }
}

Output:

Front End Dev: React
Back End Dev: NodeJS
Front End Dev: Vue
Back End Dev: Flask
Front End Dev: Angular
Back End Dev: Laravel
This program is finished

Based on that code, the select {..} works similiar with switch case. The difference is select {..} is used together with channels.

Concurrency Patterns

There are some concurrency patterns that can be used for creating a concurrent program in Go. The concurrency patterns that will be covered in this blog are pipeline, context, fan in and fan out.

Pipeline

Pipeline pattern is basically works like a pipe that connected each other. There are three main steps in this pattern:

  • Get values from channels
  • Perform some operations with the value
  • Send values to channel so the value can be consumed or received Here it is the example of pipeline concurrency pattern:
func main() {
    ints := generate()
    results := average(ints)
    for v := range results {
        fmt.Println("Average:", v)
    }
}

//STEP 1: Get values from channel
func generate() <-chan []int {
    //create a channel that holds a value of type []int
    out := make(chan []int)
    go func() {
        //insert some data into channel
        data := [][]int{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}
        for _, v := range data {
            out <- v
        }
        //close the channel which means the operation
        //to the channel is finished
        close(out)
    }()
    //return a channel
    return out
}

//STEP 2: Perform operation with the value, in this case is average calculation
func average(i <-chan []int) <-chan int {
    //create a channel
    out := make(chan int)
    go func() {
        //receive values from a channel
        //the received value is []int
        for nums := range i {
            //then calculate value's average
            //STEP 3: Send values to channel
            out <- avg(nums)
        }
        close(out)
    }()
    return out
}

//function for calculating average of numbers
func avg(nums []int) int {
    sum := 0
    for _, v := range nums {
        sum += v
    }
    result := sum / len(nums)
    return result
}

Output:

Average: 2
Average: 5
Average: 8

Based on that code, there are three steps in pipeline pattern, the first step is get the values from a channel with generate() function. Then the next step is perform operation (in this code is average calculation) with values that already received in the first step then the final step is send operation results to the channel so the result of average calculation can be consumed.Basically, the second and third step is combined in average() function.

Fan In

The fan in pattern is a concurrency pattern that takes some inputs and used it into one channel. Fan In pattern works like multiplexer. Here it is the simple visualization of Fan In.
Fan In
The example of fan in pattern:

func main() {
    ints := generate()
    results := make(chan int)
    go average(ints, results)
    for v := range results {
        fmt.Println("Average:", v)
    }
}

//send some values to the channel
func generate() <-chan []int {
    //create a channel that holds a value of type []int
    out := make(chan []int)
    go func() {
        //insert some data into channel
        data := [][]int{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}
        for _, v := range data {
            out <- v
        }
        //close the channel which means the operation
        //to the channel is finished
        close(out)
    }()
    //return a channel
    return out
}

//
func average(i <-chan []int, receiver chan int) {
    //init waitgroup
    var wg sync.WaitGroup
    //add 1 goroutine
    wg.Add(1)

    //launch a goroutine
    go func() {
        //receive values from channel called i
        for v := range i {
            //send the average calculation result to the channel called receiver
            receiver <- avg(v)
        }
        //operation is done
        wg.Done()
    }()
    //wait until goroutine is finished
    wg.Wait()
    //close the channel
    close(receiver)
}

//function for calculating average of numbers
func avg(nums []int) int {
    sum := 0
    for _, v := range nums {
        sum += v
    }
    result := sum / len(nums)
    return result
}

Output:

Average: 2
Average: 5
Average: 8

Based on that code, the way of fan in pattern works can be seen in average() function. In this function, the waitgroup is involved to take control of a bunch of inputs (but in this case is only 1 input, the example of 2 inputs will be covered) then send the value into single channel that called receiver on that code.

Here it is the example of fan in pattern with 2 inputs:

func main() {
    ints := generate()
    ints2 := generate() //add another input
    results := make(chan int)
    go average(ints, ints2, results) //takes 2 input (ints and ints2)
    for v := range results {
        fmt.Println("Average:", v)
    }
}

//send some values to the channel
func generate() <-chan []int {
    //create a channel that holds a value of type []int
    out := make(chan []int)
    go func() {
        //insert some data into channel
        data := [][]int{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}
        for _, v := range data {
            out <- v
        }
        //close the channel which means the operation
        //to the channel is finished
        close(out)
    }()
    //return a channel
    return out
}

//
func average(i, i2 <-chan []int, receiver chan int) {
    //init waitgroup
    var wg sync.WaitGroup
    //add 2 goroutines
    wg.Add(2)

    //launch a goroutine
    go func() {
        //receive values from channel called i
        for v := range i {
            //send the average calculation result to the channel called receiver
            receiver <- avg(v)
        }
        //operation is done
        wg.Done()
    }()

    //launch a goroutine
    go func() {
        //receive values from channel called i2
        for v := range i2 {
            //send the average calculation result to the channel called receiver
            receiver <- avg(v)
        }
        //operation is done
        wg.Done()
    }()

    //wait until all goroutines is finished
    wg.Wait()
    //close the channel
    close(receiver)
}

//function for calculating average of numbers
func avg(nums []int) int {
    sum := 0
    for _, v := range nums {
        sum += v
    }
    result := sum / len(nums)
    return result
}

Output:

Average: 2
Average: 5
Average: 2
Average: 8
Average: 5
Average: 8

Fan Out

The fan out pattern is a concurrency pattern where many functions can read from the same channel until the channel is closed. Usually, fan in and fan out pattern can be used together.

Here it is the simple visualization of Fan Out.
Fan Out
The example of fan out pattern:

func main() {
    ints := generate()
    c1 := average(ints)
    c2 := average(ints)

    for v := range merge(c1, c2) {
        fmt.Println("Average:", v)
    }
}

//send some values to the channel
func generate() <-chan []int {
    //create a channel that holds a value of type []int
    out := make(chan []int)
    go func() {
        //insert some data into channel
        data := [][]int{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}
        for _, v := range data {
            out <- v
        }
        //close the channel which means the operation
        //to the channel is finished
        close(out)
    }()
    //return a channel
    return out
}

//function for channel to calculate average of numbers
func average(in <-chan []int) <-chan int {
    out := make(chan int)
    go func() {
        for v := range in {
            out <- avg(v)
        }
        close(out)
    }()
    return out
}

//function for calculating average of numbers
func avg(nums []int) int {
    sum := 0
    for _, v := range nums {
        sum += v
    }
    result := sum / len(nums)
    return result
}

//merge many channels into one channel
func merge(ch ...<-chan int) <-chan int {
    //init waitgroup
    var wg sync.WaitGroup
    out := make(chan int)

    //declare a func called output
    output := func(c <-chan int) {
        //receive many values from channel "c"
        for n := range c {
            out <- n
        }
        wg.Done()
    }

    //add goroutines based on the length of channels
    wg.Add(len(ch))

    //execute output() for every value
    //inside the channel
    for _, c := range ch {
        go output(c)
    }

    //create a goroutine
    go func() {
        wg.Wait()  //wait the operation of all goroutines
        close(out) //close the channel if operation finished
    }()

    return out
}

Output:

Average: 2
Average: 5
Average: 8

Based on that code, the fan in and fan out pattern can be combined. The fan in mechanism can be seen in merge() function to merge all involved channels into single channel. The fan out mechanism can be seen in average() function that used by many channels.

Context

Context is a concurrency pattern that available in Go. Context is usually used in backend development such as accessing database.

Here it is the example of using context:

func main() {
    //initiate context called ctx and cancel function called "cancel"
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        n := 0
        for {
            select {
                //if the context is done, then finish the operation
            case <-ctx.Done():
                return
            default:
                n++
                fmt.Println("Result:", square(n))
            }
        }
    }()

    time.Sleep(time.Second * 3)

    fmt.Println("cancelling context..")
    cancel() //cancel the context
    fmt.Println("context cancelled!")

    time.Sleep(time.Second * 3)
}

//function to calculate the square of certain number
func square(n int) int {
    //time.Sleep is for demonstration purpose
    time.Sleep(time.Millisecond * 200)
    return n * n
}

Output:

Result: 1
Result: 4
Result: 9
Result: 16
Result: 25
Result: 36
Result: 49
Result: 64
Result: 81
Result: 100
Result: 121
Result: 144
Result: 169
Result: 196
cancelling context..
context cancelled!
Result: 225

Based on that code, the context is created using context.WithCancel() function that returns a context and cancel that can be used. the Done() function from the context is used to determine that the context is finished. the cancel() function is called to cancel the context so operation related with context is cancelled then finished in this case.

Notes

I hope this article helpful for helping to learn the Go programming language. If you have any thoughts or feedbacks, you can write it in the discussion section below.

Discussion (0)

pic
Editor guide