DEV Community

Mark
Mark

Posted on • Updated on

Practical Go concurrency

#go

Simply relying on chan, sync.WaitGroup to build a scalable and complex Go program is often not enough. In this post, we will talk about several tools to help us better control our resources:

  1. Pipelining: Jobs and workers
  2. Rate limiter
    1. Constant interval
    2. Token bucket algorithm
    3. Leaky bucket algorithm
  3. Synchronization
    1. Atomic operations
    2. Mutex
    3. Semaphore
  4. Managing goroutines
    1. Context
    2. Errgroup
  5. Closing remarks

Before we start

Readers should be familiar with Go's basic concurrency concept such as Concurrency is not Parallelism.

Pipelining: Jobs and workers

Suppose we have a number of jobs, n_jobs, in the queue and we can only afford to spawn n_workers goroutines where n_workers < n_jobs to process the jobs concurrently. We can use shared channels to achieve this.

Each worker will digest a job from the jobs inbound channel and send the processed result to the results outbound channel.

func worker(jobs <-chan int, results chan<- int) {
    for job := range jobs {
        // do some work
        result := job
        // send the result when it's done
        results <- result
    }
}
Enter fullscreen mode Exit fullscreen mode

We can then construct the pipeline (job distribution, worker processing, result handling) in the upper level. In some real-world cases, the number of results sent to the results channel may not always equal to the number of jobs (ex: failure during worker processing), so we need a signal to indicate that all the workers are finished and no more results will be sent in order to close the results channel. We will be using sync.WaitGroup for this purpose.

func main() {
    ...
    n_jobs := 10
    n_workers := 3
    jobs := make(chan int)
    results := make(chan int)

    // distribute the jobs
    go func() {
        for j := 0; j < n_jobs; j++ {
            jobs <- j
        }
        // close the jobs channel after all jobs are distributed
        close(jobs)
    }()

    // spawn the workers
    var wg sync.WaitGroup
    for w := 0; w < n_workers; w++ {
        wg.Add(1)
        // idiomatic way to record that a worker has finished
        go func() {
            defer wg.Done()
            worker(jobs, results)
        }()
    }

    // monitor the workers status to close the results channel
    go func() {
        wg.Wait()
        close(results)
    }()

    // process the results
    for result := range results {
        // do something with the result
        fmt.Println(result)
    }
    ...
}
Enter fullscreen mode Exit fullscreen mode

In the pipeline execution, closing channels from earlier stages to signal they're finished rather than relying on counters not only reduces the complexity of the code, it also helps maintain the correct execution order so we don't accidentally exit the program prematurely. For example, when the workers have finished processing but the results are still being written.

Rate limiter

Suppose that we have a number of jobs to process, but we want to make sure they are distributed in a manageable rate.

Constant interval

The simplest rate limiter can be implemented with time.Ticker (or time.Tick, time.Sleep).

func main() {
    ...
    limiter := time.NewTicker(200 * time.Millisecond)
    // submit 10 jobs
    for j := 0; j < 10; j++ {
        <-limiter.C // sleep for 200ms
        fmt.Println(j)
    }
    limiter.Stop()
    ...
}
Enter fullscreen mode Exit fullscreen mode

The main difference between time.Tick and time.NewTicker is that time.Tick has no way of shutting down.

Token bucket algorithm

Suppose we have a bucket with maximum capacity of t tokens, the tokens are added to the bucket at a constant rate r and each job submission will consume a token. A basic token bucket algorithm allows us to submit as many jobs as the number of tokens in the bucket. The supplementary Go package golang.org/x/time/rate implements this algorithm.

func main() {
    ...
    // r = 1/second, t = 4
    limiter := rate.NewLimiter(1, 4)
    ctx := context.Background()
    // submit 10 jobs
    for j := 0; j < 10; j++ {
        if err := limiter.Wait(ctx); err == nil {
            fmt.Println(j)
        }
    }
    ...
}
Enter fullscreen mode Exit fullscreen mode

The benefit of the token bucket algorithm is that it allows us to have more control over the job submissions. We can either submit jobs as soon as there are tokens available or submit a number of jobs simultaneously as long as there are enough tokens. However, when used in a multi-user setting (multiple users, each with multiple jobs to submit), this flexibility could allow a greedy user to drain the tokens and block others from submitting, it's also undesirable for workers that require a steady flow of inputs.

Note: we will see more use cases of the context package. This package is widely used when managing multiple goroutines and we will talk about it in the context section.

Leaky bucket algorithm

With a leaky bucket algorithm, we submit a number of jobs to a bucket and it leaks the jobs at a constant rate to the workers. It can be seen as a token bucket algorithm with a bucket capacity of 1 (no burst allowed). The go.uber.org/ratelimit package implements a slightly more efficient version than the official golang.org/x/time/rate.

The benefit of the leaky bucket algorithm is that the jobs are submitted in a constant rate so processing is more reliable. However, we may be handling the jobs sub-optimally (not maximizing the system throughput).

Synchronization

In concurrent processing, we might read and write data simultaneously. To avoid unexpected behaviors (ex: overriding), Go provides several synchronization methods.

Atomic operations

The official sync/atomic package provides a basic interface to synchronize concurrent computations on a variable.

func main() {
    ...
    var wg sync.WaitGroup
    var shared atomic.Uint64

    // simulate 3 concurrent workers
    for w := 0; w < 3; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // each worker does some work on the shared variable
            for i := 0; i < 200; i++ {
                shared.Add(1)
            }
        }()
    }

    wg.Wait()
    fmt.Printf("%v\n", shared.Load())
    ...
}
Enter fullscreen mode Exit fullscreen mode

Be aware that Go encourages minimal usage of such manipulations:

Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. Share memory by communicating; don't communicate by sharing memory.

Mutex

A mutex is a lock that limits access of some particular data to a single thread/goroutine to achieve synchronized computation. It can be seen as a more powerful version of the sync/atomic primitives since it allows us to work with more complex data structures.

type ComplexData struct {
    mu  sync.Mutex
    ctr uint64
}
func main() {
    ...
    var wg sync.WaitGroup
    shared := ComplexData{ctr: 0}

    // simulate 3 concurrent workers
    for w := 0; w < 3; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for i := 0; i < 200; i++ {
                shared.mu.Lock() // start blocking
                shared.ctr += 1
                shared.mu.Unlock() // release resource
            }
        }()
    }

    wg.Wait()
    fmt.Printf("%v\n", shared.ctr)
    ...
}
Enter fullscreen mode Exit fullscreen mode

Similar to sync.WaitGroup, a mutex should not be copied, instead it should be passed by pointers.

Semaphore

Broadly speaking, a semaphore is a more general mutex. Instead of restricting access of some data to a single thread/goroutine (mutex), a semaphore allows access to at most a predetermined number of threads/goroutines. When this number is equal to 1, a semaphore is effectively a mutex.

There are two ways to implement a semaphore, the first is a naive version where a simple buffered channel will suffice, the second is a more general solution where the golang.org/x/sync/semaphore package is used.

Semaphore with a buffered channel

We will leverage the fact that a buffered channel will block until there are spaces available. An empty struct is the most efficient data structure for this purpose.

func main() {
    ...
    // allow at most 3 concurrent goroutines (workers)
    sem := make(chan struct{}, 3)

    for j := 0; j < 10; j++ {
        // occupy a spot in the buffered channel
        sem <- struct{}{}  // block if the buffer is full
        ji := j // explicit loop-body-local variable
        go func() {
            fmt.Println(ji)
            time.Sleep(100 * time.Millisecond) // simulate work
            <-sem // release a spot
        }()
    }
    ...
}
Enter fullscreen mode Exit fullscreen mode

Semaphore with golang.org/x/sync/semaphore

This package offers weighted concurrency. That is, we have the flexibility to decide whether a particular thread/goroutine should take up more units of concurrency given a total budget. The following snippet implements an equivalent version to the example above.

func main() {
    ...
    // allow at most 3 units of concurrent goroutines (workers)
    sem := semaphore.NewWeighted(3)
    ctx := context.Background()

    for j := 0; j < 10; j++ {
        sem.Acquire(ctx, 1) // each goroutine takes up 1 unit
        ji := j // explicit loop-body-local variable
        go func() {
            defer sem.Release(1) // release 1 spot
            fmt.Println(ji)
            time.Sleep(100 * time.Millisecond) // simulate work
        }()
    }
    ...
}
Enter fullscreen mode Exit fullscreen mode

Managing goroutines

Whether it's a user cancellation event, deadline reached/timeouts, or errors, we need a way to relay that information to the running goroutines so we can stop them properly.

Context

A context acts as a global signal distributor, it can be created with terminating conditions such as custom user cancellations, deadline reached or timeouts. When a context is terminated (cancelled), the ctx.Done() channel is closed and the receiving end is immediately notified. Typically, goroutines use a select statement to monitor the status of the ctx.Done() channel.

The following is an example for handling timeouts gracefully, play around with the simulated workload (the sleep duration, currently 100ms) to see the effect.

func main() {
    ...
    var wg sync.WaitGroup
    timeout := time.Duration(300 * time.Millisecond)
    ctx, cancel := context.WithTimeout(
        context.Background(), timeout,
    )
    defer cancel() // cancel as soon as we are finished

    // simulate 3 concurrent workers
    for w := 0; w < 3; w++ {
        wg.Add(1)
        wi := w // explicit loop-body-local variable
        go func() {
            defer wg.Done()
            // simulate multiple jobs
            for j := 0; j < 8; j++ {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %v timed out\n", wi)
                    return // not to leak the goroutine
                default:
                    fmt.Printf("Worker %v doing %v\n", wi, j)
                }
                // simulate work
                time.Sleep(100 * time.Millisecond)
            }
            fmt.Printf("Worker %v is done\n", wi)
        }()
    }

    wg.Wait()
    ...
}
Enter fullscreen mode Exit fullscreen mode

A context can also carry global variables (key-value pairs) so that all goroutines can read them.

    ...
    ctx = context.WithValue(ctx, "theKey", "theValue")
    ...
Enter fullscreen mode Exit fullscreen mode

Errgroup

Now we have seen how a context can be used to send signals to all the goroutines. This is exactly what we need when an error occurs in a goroutine: we want to gracefully shutdown other running goroutines (ex: store the state) and not leak resources. Notice that in the previous example, we still need sync.WaitGroup to synchronize the goroutines normally. Combining sync.WaitGroup, context, and the need for error handling, the golang.org/x/sync/errgroup package is born.

The following is an extension to the above example.

func main() {
    ...
    timeout := time.Duration(300 * time.Millisecond)
    ctx, cancel := context.WithTimeout(
        context.Background(), timeout,
    )
    defer cancel() // cancel as soon as we are finished
    g, ctx := errgroup.WithContext(ctx)

    // simulate 3 concurrent workers
    for w := 0; w < 3; w++ {
        wi := w // explicit loop-body-local variable
        g.Go(func() error {
            // simulate multiple jobs
            for j := 0; j < 8; j++ {
                select {
                case <-ctx.Done():
                    // could be a timeout or an error
                    fmt.Printf("Worker %v cancelled\n", wi)
                    return ctx.Err()
                default:
                    fmt.Printf("Worker %v doing %v\n", wi, j)
                    // simulate an error
                    if wi == 1 && j == 4 {
                        return fmt.Errorf("simulated error")
                    }
                }
                // simulate work
                time.Sleep(10 * time.Millisecond)
            }
            fmt.Printf("Worker %v is done\n", wi)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    ...
}
Enter fullscreen mode Exit fullscreen mode

Play around with the simulated workload or the simulated error to see the effect.

Closing remarks

Go's concurrency pattern is very powerful but sometimes we may accidentally leak resources due to its flexibility. For example, spawning uncontrolled amount of goroutines or failing to stop them when their computations are no longer valid. Learning how to control and efficiently use our resources is the first step to build a scalable system, and hopefully the tools introduced in this post can be of help.

Top comments (0)