DEV Community

Pascal Dennerly
Pascal Dennerly

Posted on • Originally published at skillscheck.co.uk on

Semaphores and Threadpools in Go

Semaphores and Threadpools in Go

Here’s a thing I was thinking about today. I had to make a load of REST calls to an API. The first one was a query to return matching objects and then for each result, make another to get more details. Version one made each request one after the other and it was slow.

Let’s look at some code to show what’s going on.

package main

import (
    "log"
    "time"
)

func main() {
    start := time.Now()

    // Imagine this is the result of a search on a REST API
    results := []int{10, 15, 8, 3, 17, 20, 1, 6, 10, 9, 13, 19}

    for _, d := range results {
        // Imagine this is a long running operation, perhaps another
        // REST API call
        log.Printf("Waiting for %d seconds\n", d)
        time.Sleep(time.Second * time.Duration(d))
    }

    log.Printf("Total time taken: %s\n", time.Now().Sub(start))
}

Example 1

You can see this eaxmple code, along with all the others in this page here.

This waits quite a while then gives the output:

2018/10/03 23:31:19 Waiting for 10 seconds
2018/10/03 23:31:29 Waiting for 15 seconds
2018/10/03 23:31:44 Waiting for 8 seconds
2018/10/03 23:31:52 Waiting for 3 seconds
2018/10/03 23:31:55 Waiting for 17 seconds
2018/10/03 23:32:12 Waiting for 20 seconds
2018/10/03 23:32:32 Waiting for 1 seconds
2018/10/03 23:32:33 Waiting for 6 seconds
2018/10/03 23:32:39 Waiting for 10 seconds
2018/10/03 23:32:49 Waiting for 9 seconds
2018/10/03 23:32:58 Waiting for 13 seconds
2018/10/03 23:33:11 Waiting for 19 seconds
2018/10/03 23:33:30 Total time taken: 2m11.012685s

Over 2 minutes? Surely we could make this faster?

Adding goroutines

What’s the simplest way to make this faster?

It’s to make all the API calls at the same time. Let’s pretend we’re doing that in our example and use go routines to parralelise that work.

for _, d := range results {
    go func() {
        // Imagine this is a long running operation, perhaps another
        // REST API call
        log.Printf("Waiting for %d seconds\n", d)
        time.Sleep(time.Second * time.Duration(d))
    } ()
}

Example 2

Let’s look at what this gives us:

2018/10/05 20:53:11 Waiting for 1 seconds
2018/10/05 20:53:11 Waiting for 17 seconds
2018/10/05 20:53:11 Waiting for 19 seconds
2018/10/05 20:53:11 Waiting for 19 seconds
2018/10/05 20:53:11 Waiting for 19 seconds
2018/10/05 20:53:11 Waiting for 19 seconds
2018/10/05 20:53:11 Total time taken: 571µs

Errrrrm, that’s not right! It’s not obvious but what I think is happening is that the value of d is being modified while the different goroutines are running, so several of them accidentally pick up the same value, that’s different from the value of d when each on was launched. And then all of the goroutines end and the program exits before the log package has written everything to the console.

Don’t end early

We need a way to wait until all of the threads are done. For this, let’s use a sync.WaitGroup. This will allow us to safely track the number of goroutines that have been created and then wait until they done.

var wg sync.WaitGroup

for _, d := range results {
    wg.Add(1)

    go func() {
        defer wg.Done()

        // Imagine this is a long running operation, perhaps another
        // REST API call
        log.Printf("Waiting for %d seconds\n", d)
        time.Sleep(time.Second * time.Duration(d))
    } ()
}

wg.Wait()

Example 3

This time we get this:

2018/10/05 21:43:57 Waiting for 6 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:43:57 Waiting for 19 seconds
2018/10/05 21:44:16 Total time taken: 19.003385s

Well at least we’re getting the correct number of log entries and the time looks about right.

Using correct values across goroutines

This one should be a little easier, we just need to make sure that we pass in the data to the goroutine function that we’re invoking. Right?

for _, d := range results {
    wg.Add(1)

    go func(secs int) {
        defer wg.Done()

        // Imagine this is a long running operation, perhaps another
        // REST API call
        log.Printf("Start waiting for %d seconds\n", secs)
        time.Sleep(time.Second * time.Duration(secs))
        log.Printf("Finished waiting for %d seconds\n", secs)
    } (d)
}

Example 4

Look! We ran them all in order, and they finished when they were done! :)

2018/10/05 22:21:20 Start waiting for 19 seconds
2018/10/05 22:21:20 Start waiting for 20 seconds
2018/10/05 22:21:20 Start waiting for 3 seconds
2018/10/05 22:21:20 Start waiting for 17 seconds
2018/10/05 22:21:20 Start waiting for 10 seconds
2018/10/05 22:21:20 Start waiting for 1 seconds
2018/10/05 22:21:20 Start waiting for 8 seconds
2018/10/05 22:21:20 Start waiting for 6 seconds
2018/10/05 22:21:20 Start waiting for 15 seconds
2018/10/05 22:21:20 Start waiting for 9 seconds
2018/10/05 22:21:20 Start waiting for 13 seconds
2018/10/05 22:21:20 Start waiting for 10 seconds
2018/10/05 22:21:21 Finished waiting for 1 seconds
2018/10/05 22:21:23 Finished waiting for 3 seconds
2018/10/05 22:21:26 Finished waiting for 6 seconds
2018/10/05 22:21:28 Finished waiting for 8 seconds
2018/10/05 22:21:29 Finished waiting for 9 seconds
2018/10/05 22:21:30 Finished waiting for 10 seconds
2018/10/05 22:21:30 Finished waiting for 10 seconds
2018/10/05 22:21:33 Finished waiting for 13 seconds
2018/10/05 22:21:35 Finished waiting for 15 seconds
2018/10/05 22:21:37 Finished waiting for 17 seconds
2018/10/05 22:21:39 Finished waiting for 19 seconds
2018/10/05 22:21:40 Finished waiting for 20 seconds
2018/10/05 22:21:40 Total time taken: 20.003293s

Great…but…they all run at once. Let’s pretend that you’re trying to call a microservice that has some serious perforance issues so making all of those calls at the same time could cause problems. I know, I know, 12 calls at once isn’t really much but let’s pretend that it is. Well we say pretend, I’ve seen real life services in Production that would struggle for many different reasons. Something that I’ve seen more than once is exponential performance drop off - increasing the number of connections or requests increases response times exponentially. And watch out, it’s surprisingly easy to do if you’re not careful!

Limiting the number of concurrent requests

So here we get in to some proper computer-y stuff. Ideally we would use a thread pool so that only a certain number of operations can be carried out at the same time. But this is Go and we’re not really using threads. So what else can we use?

What we need here is to maintain a count of how many goroutines are active right now and stop more from running so we don’t exceed that magic number. We need to block goroutines when we don’t have enough capactity. We need to use a semaphore!

There are a few approaches to using semaphores but fortunately we’re using Go so we can take advantage of channels to give us the correct behaviour without having to explicitly use a thread pool.

For this we need to create a buffered channel, so a channel with certain number of slots that can be filled at the same time. But what do we fill it with? Does it matter? Well it doesn’t, really. Did you know you can pass an empty struct in to channel? Lot’s of details here.

Let’s see how this works:

var wg sync.WaitGroup
sem := make(chan struct{}, 3)

for _, d := range results {
        wg.Add(1)

        go func(wait int) {
                defer func() {
                        wg.Done()

                        // Release the semaphore resource
                        <-sem
                }()

                // Aquire a single semaophore resource
                sem <- struct{}{}

                // Imagine this is a long running operation, perhaps another
                // REST API call
                log.Printf("Start waiting for %d seconds\n", wait)
                time.Sleep(time.Second * time.Duration(wait))
                log.Printf("Finished waiting for %d seconds\n", wait)
        }(d)
}

wg.Wait()

So what does this look like?

2018/10/06 00:12:09 Start waiting for 19 seconds
2018/10/06 00:12:09 Start waiting for 15 seconds
2018/10/06 00:12:09 Start waiting for 10 seconds
2018/10/06 00:12:19 Finished waiting for 10 seconds
2018/10/06 00:12:19 Start waiting for 3 seconds
2018/10/06 00:12:22 Finished waiting for 3 seconds
2018/10/06 00:12:22 Start waiting for 8 seconds
2018/10/06 00:12:24 Finished waiting for 15 seconds
2018/10/06 00:12:24 Start waiting for 17 seconds
2018/10/06 00:12:28 Finished waiting for 19 seconds
2018/10/06 00:12:28 Start waiting for 10 seconds
2018/10/06 00:12:30 Finished waiting for 8 seconds
2018/10/06 00:12:30 Start waiting for 6 seconds
2018/10/06 00:12:36 Finished waiting for 6 seconds
2018/10/06 00:12:36 Start waiting for 20 seconds
2018/10/06 00:12:38 Finished waiting for 10 seconds
2018/10/06 00:12:38 Start waiting for 9 seconds
2018/10/06 00:12:41 Finished waiting for 17 seconds
2018/10/06 00:12:41 Start waiting for 13 seconds
2018/10/06 00:12:47 Finished waiting for 9 seconds
2018/10/06 00:12:47 Start waiting for 1 seconds
2018/10/06 00:12:48 Finished waiting for 1 seconds
2018/10/06 00:12:54 Finished waiting for 13 seconds
2018/10/06 00:12:56 Finished waiting for 20 seconds
2018/10/06 00:12:56 Total time taken: 47.005534s

Example 5

We did it! Admittedly not as quick as earlier, but we only do 3 things at once and therefore not totally overloading our pretend fragile API.

Passing data on to the next step

Now it wouldn’t it be nice if after we make our calls to this API, we processed the results all together? Maybe we’re trying to aggregate something or use the individual bits of data to draw a pretty picture.

Let’s use another channel to pass these response on.

var wg sync.WaitGroup
sem := make(chan struct{}, 3)
responses := make(chan int)

for _, d := range results {
        wg.Add(1)

        go func(wait int) {
                defer func() {
                        // Release the semaphore resource
                        <-sem
                        wg.Done()
                }()

                // Aquire a single semaophore resource
                sem <- struct{}{}

                // Imagine this is a long running operation, perhaps another
                // REST API call
                log.Printf("Start waiting for %d seconds\n", wait)
                time.Sleep(time.Second * time.Duration(wait))
                log.Printf("Finished waiting for %d seconds\n", wait)

                responses <- wait / 2
        }(d)
}

wg.Wait()

for r := range responses {
        log.Printf("Got result %d", r)
}

Example 6

Which gives…

2018/10/06 00:37:04 Start waiting for 19 seconds
2018/10/06 00:37:04 Start waiting for 17 seconds
2018/10/06 00:37:04 Start waiting for 15 seconds
2018/10/06 00:37:19 Finished waiting for 15 seconds
2018/10/06 00:37:21 Finished waiting for 17 seconds
2018/10/06 00:37:23 Finished waiting for 19 seconds
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0000120a8)
        /usr/local/go/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc0000120a0)
        /usr/local/go/src/sync/waitgroup.go:130 +0x64
main.main()
        /home/dnnrly/projects-host/sempool/example6/main.go:42 +0x184

goroutine 4 [chan send]:
main.main.func1(0xc00009e000, 0xc0000120a0, 0xc00009e060, 0xa)
        /home/dnnrly/projects-host/sempool/example6/main.go:30 +0x6d
created by main.main
        /home/dnnrly/projects-host/sempool/example6/main.go:22 +0x158

I’ve abreviated this log output, there is a lot of it.

Well, that’s clearly not good! We have a deadlock and looking at the lines of code indicated, it’s a problem with waiting for the sync.WaitGroup to finish and trying to acquire the semaphore. If you run this example yourself you can see it tells us which lines we are blocking on.

Hmmm, the first thing I tried didn’t work. You, reader - go do something else for a couple of hours while I figure this out.

Moving to mutexes

So I am writing this literally 8 hours afte that last paragraph. Admittedly I was doing other things during those 8 hours but still, it took some figuring out. To help me out, I created an alterntive to example 6 with logging. Take a look at the repo if you’re interested.

It appears that the problem is that if we use channels to store the ‘result’ of our calculation then we have a problem. Essentially by using make(chan int), we are creating another buffered channel. But this time, with zero slots - it needs to be read from immediately or else it will block. Which is exactly what happened. Our worker goroutine blocked immediately, meaning that we never get to the end of the goroutines and we deadlock.

It looks like I made the classic mistake of trying to use channels to synchronize rather than comminicate. What we really want to do in this particular situation is append to a slice so that it can be processed later on. To do this we have to make sure that we don’t suffer from other race conditions that might mess up the append operation - the append operation is definitely not thread safe.

To get around this, we use a sync.Mutex to force all of the goroutines to perform the append one at a time.

        var wg sync.WaitGroup
        sem := make(chan struct{}, 3)
        responses := []int{}
        mutex := &sync.Mutex{}

        for _, d := range results {
                wg.Add(1)

                go func(wait int) {
                        defer func() {
                                // Release the semaphore resource
                                <-sem
                                wg.Done()
                        }()

                        // Aquire a single semaophore resource
                        sem <- struct{}{}

                        // Imagine this is a long running operation, perhaps another
                        // REST API call
                        log.Printf("Start waiting for %d seconds\n", wait)
                        time.Sleep(time.Second * time.Duration(wait))
                        log.Printf("Finished waiting for %d seconds\n", wait)

                        mutex.Lock()
                        responses = append(responses, wait/2)
                        mutex.Unlock()
                }(d)
        }

        wg.Wait()

        for _, r := range responses {
                log.Printf("Got result %d", r)
        }

And look upon the glorious success:

2018/10/06 22:16:48 Start waiting for 8 seconds
2018/10/06 22:16:48 Start waiting for 19 seconds
2018/10/06 22:16:48 Start waiting for 10 seconds
2018/10/06 22:16:56 Finished waiting for 8 seconds
2018/10/06 22:16:56 Start waiting for 17 seconds
2018/10/06 22:16:58 Finished waiting for 10 seconds
2018/10/06 22:16:58 Start waiting for 3 seconds
2018/10/06 22:17:01 Finished waiting for 3 seconds
2018/10/06 22:17:01 Start waiting for 15 seconds
2018/10/06 22:17:07 Finished waiting for 19 seconds
2018/10/06 22:17:07 Start waiting for 10 seconds
2018/10/06 22:17:13 Finished waiting for 17 seconds
2018/10/06 22:17:13 Start waiting for 1 seconds
2018/10/06 22:17:14 Finished waiting for 1 seconds
2018/10/06 22:17:14 Start waiting for 6 seconds
2018/10/06 22:17:16 Finished waiting for 15 seconds
2018/10/06 22:17:16 Start waiting for 9 seconds
2018/10/06 22:17:17 Finished waiting for 10 seconds
2018/10/06 22:17:17 Start waiting for 20 seconds
2018/10/06 22:17:20 Finished waiting for 6 seconds
2018/10/06 22:17:20 Start waiting for 13 seconds
2018/10/06 22:17:25 Finished waiting for 9 seconds
2018/10/06 22:17:33 Finished waiting for 13 seconds
2018/10/06 22:17:37 Finished waiting for 20 seconds
2018/10/06 22:17:37 Got result 4
2018/10/06 22:17:37 Got result 5
2018/10/06 22:17:37 Got result 1
2018/10/06 22:17:37 Got result 9
2018/10/06 22:17:37 Got result 8
2018/10/06 22:17:37 Got result 0
2018/10/06 22:17:37 Got result 7
2018/10/06 22:17:37 Got result 5
2018/10/06 22:17:37 Got result 3
2018/10/06 22:17:37 Got result 4
2018/10/06 22:17:37 Got result 6
2018/10/06 22:17:37 Got result 10
2018/10/06 22:17:37 Total time taken: 49.006433s

Ay you can tell, I’m rather happy with this result.

Conclusion

The 2 hardest things in computing are naming things, concurrency and off-by-one errors.

Someone famous in the Go community, on Twitter (I forget who)

So we’ve looked at how we can do work (make API requests?) concurrently and what we can do to limit just how concurrent we want to be. We then looked at how we can pass results from this concurrent work on to the next step in the process, without accidentally reading data in corruptly or causing a deadlocks.

Top comments (0)