DEV Community

loading...

How to build a thread-safe Queue in Go?

hvydya profile image Harsha Vaidya ・5 min read

A thread-safe queue is a queue which can be used in a multi-threaded environment without any loss in data. Before we build one, let us discuss more on why we need a thread-safe queue. Feel free to skip the explanation and head to the bottom of this blog to find the full implementation for the queue if you know the prerequisites πŸ˜„.

Why does one need a thread-safe queue?

Consider this go program with a queue implementation. This is the main function of that program. It creates a queue with size 1 and inserts an element into it. Then we spawn two go routines which remove the inserted element.

func main() {
    q := CreateQueue(1)
    q.Insert(0)

    // Start two go routines
    for i := 0; i < 2; i++ {
        // This go routine will try to call the Remove function of the queue.
        go func(q *Queue) {
            if removed, e := q.Remove(); e == nil {
                fmt.Printf("Removed %d\n", removed)
            } else {
                fmt.Printf("got error %v\n", e)
            }

        }(q)
    }
    time.Sleep(100 * time.Second)
}
Enter fullscreen mode Exit fullscreen mode

Also look at the Remove function. I've added the time.Sleep function to show what happens when things take too long (1ns=1x10-9s might not be too long for us but can't say the same about computers πŸ˜‰) in a multi-threaded environment.

func (q *Queue) Remove() (int, error) {
    if len(q.q) > 0 {
        time.Sleep(1 * time.Nanosecond)
        item := q.q[0]
        q.q = q.q[1:]
        return item, nil
    }
    return 0, errors.New("Queue is empty")
}
Enter fullscreen mode Exit fullscreen mode

In a single-threaded environment, no problem would occur and everything would work as expected i.e two Remove calls which would remove the element for the first call and return an error for the second call.

But for multi-threaded environments, something very interesting happens. If you were to run that program in the playground you would get a panic from Go. Something like this:

panic: runtime error: index out of range [0] with length 0

goroutine 6 [running]:
main.(*Queue).Remove(0xc00000c0a0, 0x0, 0x0, 0x0)
    /tmp/sandbox828372841/prog.go:33 +0xf9
main.main.func1(0xc00000c0a0)
    /tmp/sandbox828372841/prog.go:56 +0x32
created by main.main
    /tmp/sandbox828372841/prog.go:55 +0xcd

Program exited: status 2.
Enter fullscreen mode Exit fullscreen mode

So what happened there?

The two go routines start executing one after the other in parallel. The first go routine calls Remove and then it checks the length in Remove (which is 1) and then enters the if block and sleeps for 1ns. Now the second go routine also calls Remove and checks the length and gets 1 as length and enters the if block and sleeps for 1ns. Why does it enter? It's because the first go routine is still sleeping, so it hasn't removed the element from the queue yet so the length was going to be 1 for sure. Now while second go routine is sleeping, the first one wakes up and removes the element. When the second one wakes up it will try to fetch the first element in the queue but, it doesn't exist! And that explains the panic we got i.e runtime error: index out of range [0] with length 0.

What did we learn from the above experiment?

A small delay in the form of time.Sleep(1 * time.Nanosecond) i.e sleeping for 1 ns is enough to cause problems. This tells how sensitive threads are to the code we write 🀯, meaning, if I were to add some lines to that function which take >= 1ns to run then we would be seeing this issue. Now I think it's clear why we need to handle operations differently in a multi-threaded environment.

So what's the solution?

TLDR; The solution is to use a mutex.

Intuitively, we need a mechanism that makes sure only one thread can Remove from the queue at any given instance. Particularly the if block since, that caused the problem in the first place. Here the if block is said to be the critical section of the Remove function. So, we would somehow "lock" the critical section of the Remove function when a thread approaches it and then "unlock" it when it's done with the operations it has to do. To do this "lock" and "unlock" we use a mutex in Go.

The mutex will make sure only one thread accesses the critical section at any given instance. So, when you Remove, before the if block, we need to Lock the following operations and when we are done we can Unlock. This is the updated snippet and below is the updated Remove which is now thread-safe.

func (q *Queue) Remove() (int, error) {
    q.mu.Lock()
    defer q.mu.Unlock()
    if len(q.q) > 0 {
        time.Sleep(1 * time.Nanosecond)
        item := q.q[0]
        q.q = q.q[1:]
        return item, nil
    }
    return 0, errors.New("Queue is empty")
}
Enter fullscreen mode Exit fullscreen mode

I've included the mutex in the Queue struct so that every instance of the queue has it's own mutex.
If you were to run the program now, you would see everything works fine as expected. No more panics ✨!

How did it work?

When we acquired the lock using q.mu.Lock() we made sure that only one go routine can access the if block. The defer q.mu.Unlock() just makes sure we unlock it after the function returns. Try not calling Unlock and ponder what happens then πŸ˜‰.

Give me the whole code 😑

Alright I've heard you πŸ˜…. Here it is:

import (
    "errors"
    "time"
    "sync"
)

type Queue struct {
    mu sync.Mutex
    capacity int
    q        []int
}

// FifoQueue 
type FifoQueue interface {
    Insert()
    Remove()
}

// Insert inserts the item into the queue
func (q *Queue) Insert(item int) error {
    q.mu.Lock()
         defer q.mu.Unlock()
    if len(q.q) < int(q.capacity) {
        q.q = append(q.q, item)
        return nil
    }
    return errors.New("Queue is full")
}

// Remove removes the oldest element from the queue
func (q *Queue) Remove() (int, error) {
    q.mu.Lock()
         defer q.mu.Unlock()
    if len(q.q) > 0 {
        item := q.q[0]
        q.q = q.q[1:]
        return item, nil
    }
    return 0, errors.New("Queue is empty")
}

// CreateQueue creates an empty queue with desired capacity
func CreateQueue(capacity int) *Queue {
    return &Queue{
        capacity: capacity,
        q:        make([]int, 0, capacity),
    }
}
Enter fullscreen mode Exit fullscreen mode

Happy coding! Hope you learnt something new today πŸ˜„.

Discussion (4)

pic
Editor guide
Collapse
alirezaahmadi profile image
Alireza Ahmadi

Great writeup! Another, and maybe slightly simpler, way of implementing thread-safe queues in Go is to use Goroutines that are thread-safe by default:

type QueueGoRoutine struct {
    capacity int
    q        chan int
}

func (q *QueueGoRoutine) Insert(item int) error {
    if len(q.q) < int(q.capacity) {
        q.q <- item
        return nil
    }
    return errors.New("Queue is full")
}

func (q *QueueGoRoutine) Remove() (int, error) {
    if len(q.q) > 0 {
        item := <-q.q
        return item, nil
    }
    return 0, errors.New("Queue is empty")
}

func CreateQueueGoRoutine(capacity int) FifoQueue {
    return &QueueGoRoutine{
        capacity: capacity,
        q:        make(chan int, capacity),
    }
}
Enter fullscreen mode Exit fullscreen mode

In my simple benchmark on my device, this implementation has a slightly better performance too:

func BenchmarkQueueMutexInsert(b *testing.B) {
    q := CreateQueueMutex(b.N)

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            q.Insert(1)
        }
    })
}

func BenchmarkQueueGoRoutineInsert(b *testing.B) {
    q := CreateQueueGoRoutine(b.N)

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            q.Insert(1)
        }
    })
}

func BenchmarkQueueMutexRemove(b *testing.B) {
    q := CreateQueueMutex(b.N)
    for i := 0; i < b.N; i++ {
        q.Insert(1)
    }

    b.ResetTimer()

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            q.Remove()
        }
    })
}

func BenchmarkQueueGoRoutineRemove(b *testing.B) {
    q := CreateQueueGoRoutine(b.N)
    for i := 0; i < b.N; i++ {
        q.Insert(1)
    }

    b.ResetTimer()

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            q.Remove()
        }
    })
}
Enter fullscreen mode Exit fullscreen mode
Collapse
hvydya profile image
Harsha Vaidya Author

This is cool. I come from a Java background so channels weren't that intuitive for me. Thank you for taking the time to put the code. Super helpful.

Collapse
alirezaahmadi profile image
Alireza Ahmadi

Glad to help.

Collapse
iamkeeper profile image
gopher

Thank you very much. I have got a lot.