Simply relying on chan
, sync.WaitGroup
to build a scalable and complex Go program is often not enough. In this post, we will talk about several tools to help us better control our resources:
Before we start
Readers should be familiar with Go's basic concurrency concept such as Concurrency is not Parallelism.
Pipelining: Jobs and workers
Suppose we have a number of jobs, n_jobs
, in the queue and we can only afford to spawn n_workers
goroutines where n_workers < n_jobs
to process the jobs concurrently. We can use shared channels to achieve this.
Each worker will digest a job from the jobs
inbound channel and send the processed result to the results
outbound channel.
func worker(jobs <-chan int, results chan<- int) {
for job := range jobs {
// do some work
result := job
// send the result when it's done
results <- result
}
}
We can then construct the pipeline (job distribution, worker processing, result handling) in the upper level. In some real-world cases, the number of results sent to the results
channel may not always equal to the number of jobs (ex: failure during worker processing), so we need a signal to indicate that all the workers are finished and no more results will be sent in order to close the results
channel. We will be using sync.WaitGroup
for this purpose.
func main() {
...
n_jobs := 10
n_workers := 3
jobs := make(chan int)
results := make(chan int)
// distribute the jobs
go func() {
for j := 0; j < n_jobs; j++ {
jobs <- j
}
// close the jobs channel after all jobs are distributed
close(jobs)
}()
// spawn the workers
var wg sync.WaitGroup
for w := 0; w < n_workers; w++ {
wg.Add(1)
// idiomatic way to record that a worker has finished
go func() {
defer wg.Done()
worker(jobs, results)
}()
}
// monitor the workers status to close the results channel
go func() {
wg.Wait()
close(results)
}()
// process the results
for result := range results {
// do something with the result
fmt.Println(result)
}
...
}
In the pipeline execution, closing channels from earlier stages to signal they're finished rather than relying on counters not only reduces the complexity of the code, it also helps maintain the correct execution order so we don't accidentally exit the program prematurely. For example, when the workers have finished processing but the results are still being written.
Rate limiter
Suppose that we have a number of jobs to process, but we want to make sure they are distributed in a manageable rate.
Constant interval
The simplest rate limiter can be implemented with time.Ticker
(or time.Tick
, time.Sleep
).
func main() {
...
limiter := time.NewTicker(200 * time.Millisecond)
// submit 10 jobs
for j := 0; j < 10; j++ {
<-limiter.C // sleep for 200ms
fmt.Println(j)
}
limiter.Stop()
...
}
The main difference between time.Tick
and time.NewTicker
is that time.Tick
has no way of shutting down.
Token bucket algorithm
Suppose we have a bucket with maximum capacity of t
tokens, the tokens are added to the bucket at a constant rate r
and each job submission will consume a token. A basic token bucket algorithm allows us to submit as many jobs as the number of tokens in the bucket. The supplementary Go package golang.org/x/time/rate
implements this algorithm.
func main() {
...
// r = 1/second, t = 4
limiter := rate.NewLimiter(1, 4)
ctx := context.Background()
// submit 10 jobs
for j := 0; j < 10; j++ {
if err := limiter.Wait(ctx); err == nil {
fmt.Println(j)
}
}
...
}
The benefit of the token bucket algorithm is that it allows us to have more control over the job submissions. We can either submit jobs as soon as there are tokens available or submit a number of jobs simultaneously as long as there are enough tokens. However, when used in a multi-user setting (multiple users, each with multiple jobs to submit), this flexibility could allow a greedy user to drain the tokens and block others from submitting, it's also undesirable for workers that require a steady flow of inputs.
Note: we will see more use cases of the context
package. This package is widely used when managing multiple goroutines and we will talk about it in the context section.
Leaky bucket algorithm
With a leaky bucket algorithm, we submit a number of jobs to a bucket and it leaks the jobs at a constant rate to the workers. It can be seen as a token bucket algorithm with a bucket capacity of 1 (no burst allowed). The go.uber.org/ratelimit
package implements a slightly more efficient version than the official golang.org/x/time/rate
.
The benefit of the leaky bucket algorithm is that the jobs are submitted in a constant rate so processing is more reliable. However, we may be handling the jobs sub-optimally (not maximizing the system throughput).
Synchronization
In concurrent processing, we might read and write data simultaneously. To avoid unexpected behaviors (ex: overriding), Go provides several synchronization methods.
Atomic operations
The official sync/atomic
package provides a basic interface to synchronize concurrent computations on a variable.
func main() {
...
var wg sync.WaitGroup
var shared atomic.Uint64
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wg.Add(1)
go func() {
defer wg.Done()
// each worker does some work on the shared variable
for i := 0; i < 200; i++ {
shared.Add(1)
}
}()
}
wg.Wait()
fmt.Printf("%v\n", shared.Load())
...
}
Be aware that Go encourages minimal usage of such manipulations:
Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. Share memory by communicating; don't communicate by sharing memory.
Mutex
A mutex is a lock that limits access of some particular data to a single thread/goroutine to achieve synchronized computation. It can be seen as a more powerful version of the sync/atomic
primitives since it allows us to work with more complex data structures.
type ComplexData struct {
mu sync.Mutex
ctr uint64
}
func main() {
...
var wg sync.WaitGroup
shared := ComplexData{ctr: 0}
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 200; i++ {
shared.mu.Lock() // start blocking
shared.ctr += 1
shared.mu.Unlock() // release resource
}
}()
}
wg.Wait()
fmt.Printf("%v\n", shared.ctr)
...
}
Similar to sync.WaitGroup
, a mutex should not be copied, instead it should be passed by pointers.
Semaphore
Broadly speaking, a semaphore is a more general mutex. Instead of restricting access of some data to a single thread/goroutine (mutex), a semaphore allows access to at most a predetermined number of threads/goroutines. When this number is equal to 1, a semaphore is effectively a mutex.
There are two ways to implement a semaphore, the first is a naive version where a simple buffered channel will suffice, the second is a more general solution where the golang.org/x/sync/semaphore
package is used.
Semaphore with a buffered channel
We will leverage the fact that a buffered channel will block until there are spaces available. An empty struct is the most efficient data structure for this purpose.
func main() {
...
// allow at most 3 concurrent goroutines (workers)
sem := make(chan struct{}, 3)
for j := 0; j < 10; j++ {
// occupy a spot in the buffered channel
sem <- struct{}{} // block if the buffer is full
ji := j // explicit loop-body-local variable
go func() {
fmt.Println(ji)
time.Sleep(100 * time.Millisecond) // simulate work
<-sem // release a spot
}()
}
...
}
Semaphore with golang.org/x/sync/semaphore
This package offers weighted concurrency. That is, we have the flexibility to decide whether a particular thread/goroutine should take up more units of concurrency given a total budget. The following snippet implements an equivalent version to the example above.
func main() {
...
// allow at most 3 units of concurrent goroutines (workers)
sem := semaphore.NewWeighted(3)
ctx := context.Background()
for j := 0; j < 10; j++ {
sem.Acquire(ctx, 1) // each goroutine takes up 1 unit
ji := j // explicit loop-body-local variable
go func() {
defer sem.Release(1) // release 1 spot
fmt.Println(ji)
time.Sleep(100 * time.Millisecond) // simulate work
}()
}
...
}
Managing goroutines
Whether it's a user cancellation event, deadline reached/timeouts, or errors, we need a way to relay that information to the running goroutines so we can stop them properly.
Context
A context acts as a global signal distributor, it can be created with terminating conditions such as custom user cancellations, deadline reached or timeouts. When a context is terminated (cancelled), the ctx.Done()
channel is closed and the receiving end is immediately notified. Typically, goroutines use a select
statement to monitor the status of the ctx.Done()
channel.
The following is an example for handling timeouts gracefully, play around with the simulated workload (the sleep duration, currently 100ms) to see the effect.
func main() {
...
var wg sync.WaitGroup
timeout := time.Duration(300 * time.Millisecond)
ctx, cancel := context.WithTimeout(
context.Background(), timeout,
)
defer cancel() // cancel as soon as we are finished
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wg.Add(1)
wi := w // explicit loop-body-local variable
go func() {
defer wg.Done()
// simulate multiple jobs
for j := 0; j < 8; j++ {
select {
case <-ctx.Done():
fmt.Printf("Worker %v timed out\n", wi)
return // not to leak the goroutine
default:
fmt.Printf("Worker %v doing %v\n", wi, j)
}
// simulate work
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Worker %v is done\n", wi)
}()
}
wg.Wait()
...
}
A context
can also carry global variables (key-value pairs) so that all goroutines can read them.
...
ctx = context.WithValue(ctx, "theKey", "theValue")
...
Errgroup
Now we have seen how a context can be used to send signals to all the goroutines. This is exactly what we need when an error occurs in a goroutine: we want to gracefully shutdown other running goroutines (ex: store the state) and not leak resources. Notice that in the previous example, we still need sync.WaitGroup
to synchronize the goroutines normally. Combining sync.WaitGroup
, context
, and the need for error handling, the golang.org/x/sync/errgroup
package is born.
The following is an extension to the above example.
func main() {
...
timeout := time.Duration(300 * time.Millisecond)
ctx, cancel := context.WithTimeout(
context.Background(), timeout,
)
defer cancel() // cancel as soon as we are finished
g, ctx := errgroup.WithContext(ctx)
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wi := w // explicit loop-body-local variable
g.Go(func() error {
// simulate multiple jobs
for j := 0; j < 8; j++ {
select {
case <-ctx.Done():
// could be a timeout or an error
fmt.Printf("Worker %v cancelled\n", wi)
return ctx.Err()
default:
fmt.Printf("Worker %v doing %v\n", wi, j)
// simulate an error
if wi == 1 && j == 4 {
return fmt.Errorf("simulated error")
}
}
// simulate work
time.Sleep(10 * time.Millisecond)
}
fmt.Printf("Worker %v is done\n", wi)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
os.Exit(1)
}
...
}
Play around with the simulated workload or the simulated error to see the effect.
Closing remarks
Go's concurrency pattern is very powerful but sometimes we may accidentally leak resources due to its flexibility. For example, spawning uncontrolled amount of goroutines or failing to stop them when their computations are no longer valid. Learning how to control and efficiently use our resources is the first step to build a scalable system, and hopefully the tools introduced in this post can be of help.
Top comments (0)