DEV Community

Ishan Khare
Ishan Khare

Posted on

Go Concurrency pipelines

In this blog we're going to explore some design patterns related to concurrency pipelines in Go

  • We'll take a real world use case as an example to begin with and try to optimize it with goroutines first.
  • Next we'll see the challenges that arise and how to debug them.
  • Finally we'll introduce another redesign that will enable us to collect stats about all the processing that we do in the above steps.

The problem statement

Let's say we have a set of users, and we want to make a few calls to an external microservice to filter out the set of users based on a set of parameters.

Since this external call an be over plain HTTP or over modern alternatives like gRPC, we won't go into the implementation of those.

What we'll focus on instead is first a brute force way of optimizing this functionality.


Implementation with goroutines

To start with lets say we have a file named handler.go with the following code:
handler.go

package handler

func PerformOperationAfterFiltering(users []users) {
    // make external call and filter out users

    operationToBePerformed(users)
}
Enter fullscreen mode Exit fullscreen mode
High level design

Let's say we are getting a list of 1000 users each time on an average. It would be a good idea to break down this into batches before making external calls.
What this would enable us to do is split out our network calls. Hence if any batch fails due to some network factors, other batches won't be affected. If we do

a single call and that fails, we would loose the response for the entire 1000 users. Also breaking down this process into smaller chunks would also enable us to optimize
it further in the subsequent steps – divide and conquer.

simple batching

But before we do that, we need to understand how we plan to communicate the results back to the calling function, and how do we know all requests have finished?

We'll be using 2 golang constructs for this:

  1. Channels
  2. WaitGroup

the high-level architecture should look something like this:

Let's open a new file to write our request function
request.go

package request

import "sync"

func Request(users []Users) chan Users {
    resp := make(chan Users)
    batchSize := 250
    var batches [][]Users
    var wg sync.WaitGroup
    for batchSize < len(users) {
        users, batches := users[batchSize:], append(batches, users[0:batchSize])
    }
    batches = append(batches, batchSize)
}

Enter fullscreen mode Exit fullscreen mode

Now that we have our batches created, we can start the requests for each batch in a goroutine of its own.
But everytime we start a new goroutine, we'll add (or register) it with a WaitGroup to keep track of the completion of each of the individual goroutines.

func Request(users []Users) chan Users {
    resp := make(chan Users)
    batchSize := 250
    var batches [][]Users

    for batchSize < len(users) {
        users, batches := users[batchSize:], append(batches, users[0:batchSize])
    }
    batches = append(batches, batchSize)

    var wg sync.WaitGroup

    for batch, _ := range batches {
        wg.Add(1)
        go func(batch []Users) {
            defer wg.Done()
            response, err := MakeRequestToThirdParty(batch)
            if err != nil {
                // log error and probably
                // also the batch of users that failed
            }

            for r, _ := range response {
                if r.CurrentBalance > 500 {
                    // we've got our user
                    // push it back on our channel
                    resp <- r.u
                }
            }
        }(batch)
    }

    return resp
}
Enter fullscreen mode Exit fullscreen mode

Note that the function returns a channel of type Users.

Also we added a defer call to wg.Done(). This will decrement our waitgroup so that our exit stages knows when all our goroutines are done.

Before we goto the exit stage, let's see how we plan to consume the users from the channel. Channels fit right in with the golang's for loop – i.e. we can consume
objects from channel just by the for-range loop of go.
In our handler.go:

package handler

import "request"

func PerformOperationAfterFiltering(users []users) {
    // make external call and filter out users
    var filteredUsers []Users

    for user := range Request(users) {
        filteredUsers = append(filteredUsers, user)
    }

    operationToBePerformed(users)
}
Enter fullscreen mode Exit fullscreen mode

That's exactly what we're doing here. range-ing over the open channel. How or when will the loop exit you may ask?
– When we close() the channel. And that's where the EXIT stage comes in.

So we go on to the exit stage. Back in our request.go, we add another closure, which makes use of our WaitGroup:

func Request(users []Users) chan Users {
    resp := make(chan Users)
    batchSize := 250
    var batches [][]Users

    for batchSize < len(users) {
        users, batches := users[batchSize:], append(batches, users[0:batchSize])
    }
    batches = append(batches, batchSize)

    var wg sync.WaitGroup

    for batch, _ := range batches {
        wg.Add(1)
        go func(batch []Users) {
            defer wg.Done()
            response, err := MakeRequestToThirdParty(batch)
            if err != nil {
                // log error and probably
                // also the batch of users that failed
            }

            for r, _ := range response {
                if r.CurrentBalance > 500 {
                    // we've got our user
                    // push it back on our channel
                    resp <- r.User
                }
            }
        }(batch)
    }

    // EXIT stage
    go func() {
        // wait for the wait group counter to drop down to 0
        // which will happen when all goroutines are done
        wg.Wait()

        // now we close the response channel
        // this will make our loop also exit
        close(resp)
    }()

    return resp
}
Enter fullscreen mode Exit fullscreen mode

The code above is self-explanatory and mimics our initial arch design completely without any issues.


Where things get dim?

The current code works perfectly. But when we deploy such code to production we usually want to log and collect metrics related to what happened.
What went wrong, what worked, how many users passed, how many didn't etc.

The current design leaves little room for collecting such metrics as we'll see soon.


Collecting metrics

The simplest approach to collecting metrics is:

  • Build 2 maps with userids to user objects – one for users passing the condition, another for users failing the condition.
  • On response, add the user to the relevant map

So let's add this simple solution to our code
handler.go

func Request(users []Users) chan Users {
    resp := make(chan Users)
    batchSize := 250
    var batches [][]Users

    passedUsers := make(map[int64]Users)
    skippedUsers := make(map[int64]Users)

    for batchSize < len(users) {
        users, batches := users[batchSize:], append(batches, users[0:batchSize])
    }
    batches = append(batches, batchSize)

    var wg sync.WaitGroup

    for batch, _ := range batches {
        wg.Add(1)
        go func(batch []Users) {
            defer wg.Done()
            response, err := MakeRequestToThirdParty(batch)
            if err != nil {
                // log error and probably
                // also the batch of users that failed
            }

            for r, _ := range response {
                if r.CurrentBalance > 500 {
                    // we've got our user
                    // push it back on our channel
                    resp <- r.User

                    passedUsers[r.User.UserId] = r.User
                }

                skippedUsers[r.User.UserId] = r.User
            }
        }(batch)
    }

    // EXIT stage
    go func() {
        // wait for the wait group counter to drop down to 0
        // which will happen when all goroutines are done
        wg.Wait()

        defer LogMetrics(passedUsers, skippedUsers)

        // now we close the response channel
        // this will make our loop also exit
        close(resp)
    }()

    return resp
}
Enter fullscreen mode Exit fullscreen mode

The code looks fine – we have added two maps and are writing our users to it. In the exit stage we've added a defer statement which will basically make sure that metrics are definitely registered. This code would compile without any errors and also run without errors for sometime, but there is a serious bug which will crash our code in very weird ways.

The issue

Whenever we write a concurrent piece of code in go and it panics due to some reason which seems weird, go race detector the the tool to run for.

The issue comes do to a race condition where we are trying to write to our hashmaps inside our goroutines. You see these hashmaps are shared data among the goroutines, and it will so happen that at a time more that one goroutine might try to write to these maps – which would result in a data race. And data race in in go are undefined behavior and will basically crash the whole program with not so friendly error messages. This is show in the image in red.

On running the above code with race detector enabled, we'll get some output like this:

==================
WARNING: DATA RACE
Read by goroutine 5:
  request.Request·001()
     request.go:36 +0x169

Previous write by goroutine 1:
  request.Request()
      request.go:36 +0x174

Goroutine 5 (running) created at:
  request.goFunc()
      request.go:19 +0x56
==================
Enter fullscreen mode Exit fullscreen mode

This simply tells us the line nos where the data race occurred, this is the first data race, another similar one is right below it where we access the skippedUsers map.

In order to fix this, we need to redesign our pipeline architecture slightly. Which we will see in the next iteration of this post.

Top comments (0)