DEV Community

Sophie DeBenedetto
Sophie DeBenedetto

Posted on

Synchronizing Go Routines with Channels and WaitGroups

In debugging a mysteriously hanging Go function recently, I learned something new about how to use WaitGroups and Channels to synchronize Go routines. Keep reading to learn more!

Intro

As a relatively new Go programmer, I was stumped recently by a bug that caused a function running multiple Go routines to hang indefinitely. After taking a deeper dive into the usage of WaitGroups and Channels to manage the behavior of Go routines, I finally understood the right way to leverage these tools and was able to resolve the bug.

In this post we'll

  • Examine the buggy code and break down why it doesn't work
  • Walk through the right way to synchronize Go routines with a simple example
  • Fix our bug!

The Blocking Bug

In an application responsible for displaying GitHub usage analytics, we have a function, MergeContributors, that is responsible for taking two GitHub user accounts that belong to the same person and "merging" them into one so that the UI can accurately reflect their contributions to project commits, pull requests and merges. This "contributor merge" entails the following:

  • Update all of the analytic engine's stored Git commits so that all the user's commits from both accounts are correctly associated to one primary account
  • Update all of the analytic engine's stored pull requests so that all the user's PRs from both accounts are correctly associated to one primary account
  • Update all of the analytic engine's stored PR merges so that all the user's merges from both accounts are correctly associated to one primary account
  • Finally, if all of those actions succeeded, update a record indicating that the two accounts have been "merged"

The function was bugging out in the following way:

  • If an error occurred in any of the first three steps
  • Then the function would block, hanging forever and never returning

Yikes! Let's take a look at the original code so that we can diagnose the issue:

func MergeCOntributors(primaryAccount, secondaryAccount) error {
  // Create a WaitGroup to manage the goroutines.
    var waitGroup sync.WaitGroup
    c := make(chan error)

    // Perform 3 concurrent transactions against the database.
    waitGroup.Add(3)

    go func() {
        waitGroup.Wait()
        close(c)
    }()

    // Transaction #1, merge "commit" records
    go func() {
        defer waitGroup.Done()

        err := mergeCommits(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #2, merge "pull request" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequests(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #3, merge "merge" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequestMerges(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  waitGroup.Wait()

  for err := range c {
        if err != nil {
            return err
        }
  }

  return markMerged(primaryAccount, secondaryAccount)
}
Enter fullscreen mode Exit fullscreen mode

Breaking Down The Bug

Let's walk through what's happening here:

  • We establish a WaitGroup, waitGroup and a channel that expects to receive errors
var waitGroup sync.WaitGroup
c := make(chan error)
Enter fullscreen mode Exit fullscreen mode
  • We increment the WaitGroup count to three, since we will be using it to orchestrate our three synchronous DB transaction Go routines
waitGroup.Add(3)
Enter fullscreen mode Exit fullscreen mode
  • We spin off a separate Go routine that will block until the waitGroup's count is back down to zero. Once it unblocks, it will close the channel.
go func() {
  waitGroup.Wait()
  close(c)
}()
Enter fullscreen mode Exit fullscreen mode
  • We spin off our three concurrent Go routines, one for each database transaction. Each Go routine runs a function that is responsible for making some database call, sending an error to the channel if necessary, and decrementing the waitGroup's count before the function returns.

  • Then, we have a call to waitGroup.Wait(). This will block the execution of the main function until the waitGroup's count is back down to zero.

  • After this blocking call, we're using range to iterate over the messages sent to the channel. The call to range will continue listening for messages to the channel until the channel is closed. This is a blocking operation.

  • Once the channel is closed by our earlier Go routine, the one that is waiting for the waitGroup count to get down to zero and then calling close(c), the call to range will stop listening to the channel and the main function will proceed to run, either returning an error if one was received by the channel or moving on to the last piece of work, the call to markMerged.

Have you spotted the problem yet?

Understanding The Problem

In order to spot the bug, we need to understand something about how sending and receiving messages over a channel works. When we send a message over a channel, that call to send the message is blocking until the message is read from the channel.

Taking a closer look at one of our DB transaction Go routines:

// Transaction #1, merge "commit" records
go func() {
  defer waitGroup.Done()

  err := mergeCommits(primaryAccount, secondaryAccount)
  if err != nil {
    c <- err
  }
}()
Enter fullscreen mode Exit fullscreen mode

We can understand that the line in which we send a message to the channel, c <- error, will block the execution of the anonymous function running in our Go routine until that message is read.

Where are we reading messages? Via our range call, which comes after a call to waitGroup.Wait():

...
waitGroup.Wait()

for err := range c {
  if err != nil {
    return err
  }
}

return markMerged(primaryAccount, secondaryAccount)
Enter fullscreen mode Exit fullscreen mode

Here's the problem:

  • The anonymous function sends a message to the channel, which blocks until the message is read
  • The message-reading code, our range call, won't run until after the waitGroup count is back down to zero, because it comes after a synchronous call to waitGroup.Wait()
  • Since the message can't be read yet, the anonymous function running in our go routine can't finish running--it won't call the deferred waitGroup.Done().
  • This means that the waitGroup count will never get back down to zero, which in turn means the call to waitGroup.Wait() that comes right before our range call will never unblock.
  • Since waitGroup.Wait() will never unblock, we can't call range, the message will never be read from the channel, and we're back where we started--in an infinite block!

The Right Way To Synchronize Go Routines

In order prevent this block, we need to ensure that our range call will run and read messages from the channel while the Go routines are running. This will ensure that any Go routine that sends a message to a channel will not block on that send, thereby allowing the Go routine's anonymous function to call waitGroup.Done().

Let's take a look at a simple example:

package main

import "fmt"

func main() {
  // Create a WaitGroup to manage the goroutines.
    var waitGroup sync.WaitGroup
    c := make(chan string)

    // Perform 3 concurrent transactions against the database.
    waitGroup.Add(3)

    go func() {
        waitGroup.Wait()
        close(c)
    }()

    go func() {
        defer waitGroup.Done()

        c <- "one"
  }()

  go func() {
        defer waitGroup.Done()

        c <- "two"
  }()

  go func() {
        defer waitGroup.Done()

        c <- "three"
  }()

  for str := range c {
        fmt.Println(str)
  }
}
Enter fullscreen mode Exit fullscreen mode

Let's break this down. We:

  • Establish a WaitGroup and set its count to three
  • Establish a channel that can receive messages that are strings
  • Spin off a Go routine that will wait until the waitGroup's count is zero, then close the channel
  • Create three separate Go routines, each of which will write a message to the channel and, once that message is read, decrement the waitGroup count
  • Then, at the same time as the running of these Go routines, we are range-ing over the channel, reading any incoming messages and printing them to STDOUT.
  • Since our range call is running while the Go routines are running, the messages each routine sends to the channel are read immediately. The calls in each routine to send the message therefore do not block for long, and each routine's anonymous function is able to invoke the waitGroup.Done() call.
  • Once each of the three Go routine concludes, having decremented the waitGroup count, the first Go routine (the one that calls waitGroup.Wait()) will unblock and close the channel.
  • Once the channel is closed and all its messages are read, the range will stop listening for messages and the main function will finish running.

If we run the code above, we'll see that the function doesn't block improperly. Instead, we see the following output successfully printed:

one
two
three
Enter fullscreen mode Exit fullscreen mode

Now that we understand the right way to synchronize our Go routines, let's fix our bug!

Fixing the Bug

We need to get rid of that second, synchronous call to waitGroup.Wait(). This call is preventing messages from getting read from the channel, which in turn prevents any calls to waitGroup.Done(). This is the cause of the block in our function.

If we remove the offending line, we're left with:

 func MergeCOntributors(primaryAccount, secondaryAccount) error {
  // Create a WaitGroup to manage the goroutines.
    var waitGroup sync.WaitGroup
    c := make(chan error)

    // Perform 3 concurrent transactions against the database.
    waitGroup.Add(3)

    go func() {
        waitGroup.Wait()
        close(c)
    }()

    // Transaction #1, merge "commit" records
    go func() {
        defer waitGroup.Done()

        err := mergeCommits(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #2, merge "pull request" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequests(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // Transaction #3, merge "merge" records
    go func() {
        defer waitGroup.Done()

        err := mergePullRequestMerges(primaryAccount, secondaryAccount)
        if err != nil {
            c <- err
        }
  }()

  // This line is bad! Get rid of it!
  // waitGroup.Wait()

  for err := range c {
        if err != nil {
            return err
        }
  }

  return markMerged(primaryAccount, secondaryAccount)
}
Enter fullscreen mode Exit fullscreen mode

Now, we're ensuring the following behavior:

  • Run a Go routine that is blocking, via a call to waitGroup.Wait(), until the WaitGroup count is down to zero, at which point it closes the channel
  • Each "DB transaction" Go routine's anonymous function will, if there is an error, send a message to the channel
  • The call to range is listening for such messages, and reading them as they arrive
  • Back in each "DB transaction" Go routine, the function is un-blocked, and the call to waitGroup.Done() will run, decrementing the WaitGroup's count
  • Once the WaitGroup's count hits zero, the first Go routine will un-block and close the channel via a call to close(c)
  • This will tell the range call to stop listening to the channel, and therefore stop blocking, allowing the main function to continue execution

Conclusion

What havoc one misplaced waitGroup.Wait() can wreak! The key takeaway here is that when you send a message to a channel, it will block until that message is read. We need to ensure that we're reading from any channels we're writing to, in order to successfully synchronize our Go routines.

In the case of our bug, we were improperly blocking the execution of our function, preventing any messages from getting read from the channel. By removing our extra waitGroup.Wait(), we guaranteed that messages sent the channel would be read, allowing the waitGroup's counter to be decremented. This in turn ensured that the channel would be closed, causing the range over that channel's messages to unblock, and allowing the main function to continue executing.

This bug certainly gave me a better understanding of Go routine synchronization, and I hope it was helpful to you too.

Happy coding!

Top comments (5)

Collapse
 
handofgod_10 profile image
Vijaykumar

So waitGroup.Wait() is the join point for the all the async go routines to have been called for. Why call the Wait() function in a seperate go routine all together.. Apologies, as I do not understand that part. Also the channels are used for communication, so basically if its an error, capture that and put in the channel or if its a success, you have to put that in the channel. Just like Wait() is the join point of the goroutines to the main thread, <-c needs to be present for the channels.

Just wondering why not go with the channels alone and why use WaitGroup. Since the use case seems to be more driven towards getting response back, channels make more sense to use anyways.

If you have a chance, could you please explain. Just curious to know a little more. I am a newbie myself

Collapse
 
stevefromaccounting profile image
Stephen Yankovich • Edited

I was thinking this same question, so I created a sample scenario to see why: goplay.tools/snippet/4mkhTcLhRoS

package main

import (
    "errors"
    "fmt"
    "golang.org/x/sync/errgroup"
    "sync"
    "time"
)

func main() {

    //usingChannels()

    //usingErrGroups()

    usingChannelsWithWg()
}

// usingChannelsWithWg this one works! Each goroutine correctly spits out errors but continues on the loop, and the
// process correctly ends only when every goroutine is finished with the loop, unlike usingChannels
//
// Thanks to https://dev.to/sophiedebenedetto/synchronizing-go-routines-with-channels-and-waitgroups-3ke2
func usingChannelsWithWg() {
    var workerWg sync.WaitGroup
    errorCh := make(chan error, 1)

    workerWg.Add(3)

    go func() {
        workerWg.Wait()
        close(errorCh)
    }()

    go func(errorCh chan<- error) {
        for i := 0; i <= 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[1]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[1] ERROR: %w", err)
            }
        }

        defer workerWg.Done()
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i <= 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[2]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[2] ERROR: %w", err)
            }
        }

        defer workerWg.Done()
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i <= 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[3]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[3] ERROR: %w", err)
            }
        }

        defer workerWg.Done()
    }(errorCh)

channelLoop:
    for {
        select {
        case err, ok := <-errorCh:
            if ok {
                fmt.Println(err.Error())
            } else {
                fmt.Println("channel closed!")
                break channelLoop
            }
        default:
        }
    }

}

// usingErrGroups this also doesn't work because it exits out of everything based ont he very first error
//from the very first goroutine
func usingErrGroups() {
    goRoutineErrGroup := new(errgroup.Group)

    goRoutineErrGroup.Go(func() error {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[1]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                return fmt.Errorf("[1] ERROR: %w", err)
            }
        }

        return nil
    })

    goRoutineErrGroup.Go(func() error {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[2]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                return fmt.Errorf("[2] ERROR: %w", err)
            }
        }

        return nil
    })

    goRoutineErrGroup.Go(func() error {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[3]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                return fmt.Errorf("[3] ERROR: %w", err)
            }
        }

        return nil
    })

    // Wait for all HTTP fetches to complete.
    if err := goRoutineErrGroup.Wait(); err != nil {
        fmt.Printf("err from a routin: %s", err.Error())
    }
}

// usingChannels this doesn't quite work because it never exits unless we explicitly close the channel. Except we
// only have one channel, and when we close the first one, it means the second two goroutines get exited early
func usingChannels() {
    errorCh := make(chan error, 1)

    go func(errorCh chan<- error) {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[1]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[1] ERROR: %w", err)
            }
        }

        defer close(errorCh)
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[2]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[2] ERROR: %w", err)
            }
        }

        defer close(errorCh)
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[3]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[3] ERROR: %w", err)
            }
        }

        defer close(errorCh)
    }(errorCh)

channelLoop:
    for {
        select {
        case err, ok := <-errorCh:
            if ok {
                fmt.Println(err.Error())
            } else {
                fmt.Println("channel closed!")
                break channelLoop
            }
        default:
        }
    }
}

func triggerErr(input int) error {
    if input%10 == 0 {
        return errors.New(fmt.Sprintf("an error, bc i == %d", input))
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

In short, the for loop reading from your error channel will never know when your goroutines are finished, and will continue to read forever.

Like the linked example, suppose you have 3 goroutines that are each looping through the numbers of 1 - 50, with every 10th number returning an error (10,20,30,...). Your code will look like:

errorChannel := make(chan error, 1)

// goroutine 1

// goroutine 2

// goroutine 3

for err := range errorChannel {
   // do things when there's an error, like print it out, PagerDuty, log it, etc
}
Enter fullscreen mode Exit fullscreen mode

Without a WaitGroup, that for loop will go on forever, because it'll never know when you are finished writing things to the channel. You can call close(errorChannel), but where would you call it? Logically, you want to call it "when --EVERY-- goroutine has iterated through the numbers 1 - 50". But how do you tell Go when that has happened? You do that with WaitGroups.

Collapse
 
jainaman9464 profile image
Aman Jain

If we're spawning 3 go routines, let's say one of the go routines returned an error. So, according to the code, in the range loop, you're simply returning the error. Isn't the channel still open at the time of returning the error because the waitGroup count is still not zero as all the go routines are not executed completely yet ?

Collapse
 
santa_blr profile image
santosh banerjee

I had the same question. Also, I was wondering how the original (buggy) code was tested at the time of development. Generally speaking the call to waitGroup.wait() is done in the parent goroutine spawning the child goroutines. But in the original code, the call to waitGroup.wait() was being placed both synchronously (at the end of the MergeCOntributors function before the range loop) and asynchronously (at the beginning) through the very first goroutine.

Collapse
 
hickeng profile image
George Hicken

Given you have a known number of routines and a hardcoded number to waitgroup.Add, did you consider using a channel with a buffer of 3? What made you take the solution above over buffering?