DEV Community

Mark Boer
Mark Boer

Posted on

Higher level threading in C++

This article was originally posted on my blog

Where Python is often called, "batteries included", one of C++ core principles is: "don't pay for what you don't use". And this too is visible in the C++'s standard template library (STL), it is deliberately kept small. One of the places this is visible is the threading functionality that the STL offers. Python's standard library contains everything from low level threading synchronization primitives (threads, locks) to high level ones (tasks, threadpools, futures). C++ only offers a single higher level one; the std::future. Unfortunately, the std::async function that returns these futures is usually implemented as little more than a wrapper around the std::thread. This has the disadvantage of a overhead of thread creation and destruction on every call of std::async.

This is why I generally prefer Threadpools, or a producer-consumer pattern. Both these patterns require a queue that contains the jobs or units of work. Worker threads will continually try to read items from such a queue and process the item. There are some libraries that offer threadsafe queue's and/or threadpools, such as poco, QT or boost, but it's actually fairly simple to implement your own, using only the STL. So let's implement one now!

Let's get started with the basic layout, we're creating a class called SynchronisationQueue with 2 method, put and get, that is loosely based on Python's SimpleQueue class.

template<typename T>
class SynchronisationQueue {
public:
    void put(const &T val);
    T get();
private:
    std::mutex mtx_;
    std::queue<T> queue_;
};
Enter fullscreen mode Exit fullscreen mode

The put implementation is quite straightforward. We need to protect access to the queue with a lock guard and then we simple push the item on the queue. The get is a little more cumbersome. We will need to check if the queue contains an item, if it does we can retrieve that and continue, but if it does not we will have to wait until another item is put on the queue that can then be returned. A simple, yet naive implementation, would be the following:

void put(const &T val)
{
    std::lock_guard<std::mutex> lck(mtx_);
    queue_.push(val);
}

T get()
{
    while (true)
    {
        {
            std::lock_guard<std::mutex> lck(mtx_);
            if (queue_.size() > 0)
            {
                T ret_val = std::move(queue_.front());
                queue_.pop();
                return ret_val;
            }
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}
Enter fullscreen mode Exit fullscreen mode

This implementation has an obvious shortcoming. The sleeping for X milliseconds wastes cpu cycles, which can be minimized by increasing the timeout, but this in turn has a negative impact on the troughput of the queue.

Here the std::condition_variable comes to the rescue, this is synchronization primitive that can be used to block threads, until another thread modifies the wait condition and notifies the waiting thread. The std::condition_variable has two types of member functions, that each come in different varieties, based on your needs:

  • wait: This function is to be called from the waiting thread, in the case of a JobQueue, this would be the workerthread waiting for a new item to be processed.
  • notify: This function is called from the notifying thread, the thread that changes a condition and wants to notify waiting threads to wake up.

If we have a condition variable cv, the most basic usage would look as follows:

// thread 1 waits for the condition
while(condition)
    cv.wait(lck);

// thread 2 notifies that thread 1 can continue
condition = true;
cv.notify_one()
Enter fullscreen mode Exit fullscreen mode

cv.wait() does two things, it releases the lock and then sleeps until cv.notify_one() or cv.notify_all() is called from another thread. Then the condition_variable unblocks and it reacquires the lock1, so you can safely continue using the protected resource.

Now you might wonder why we are continously checking the condition in a while loop. When there are multiple threads involved, there is no guarantee that between thread 2 setting the condition=true there isn't a third thread that sets it back to false. However, even if there is just the 2 threads, there is a chance that the condition variable awakens without being notified. This is called a spurious wakeup and it is allowed by the C++ standard to allow for some flexibility in the implementation of std::condition_variable, though it is not clear how many implementations actually use this freedom2. So you will always need to check the condition after the condition variable is notified, which is easiest done in a while loop.

So to sum this up, whenever you wish to use a condition variable you need the following three parts:

  1. A std::condition_variable
  2. A std::mutex
  3. A condition protected by the mutex (2)

In the case of the SynchronizationQueue the condition we will be using is the queue being empty. While it is empty we will wait until an element has been added to the queue by another thread. This thread will need to notify the waiting thread, signaling that it can continue and retrieve an item from the queue.

It is crucial that we do not alter the size of the queue without holding the lock, as this could potentially lead to race conditions. Apart from the obvious issues with accesing a std::queue from multiple threads, there can also be an issues with the condition variable. If a thread was allowed to insert an item into the queue without holding the lock, an item could potentially be inserted between the worker thread checking the condition (queue being empty) and starting to wait on the condition variable. This could result in a state where the worker thread is waiting (potentially indefinitely) while there is in fact an item in the queue it could process. This is why you always must hold the lock while altering the condition, even when the condition could be changed atomicly.

In the following example we have added the condition variable empty_queue_cv_ to the private member data of the SynchronisationQueue. We can rewrite the get method to make use of the condition variable:

T get()
{
    std::unique_lock<std::mutex> lck(mtx_);
    while (queue_.empty())
        empty_queue_cv_.wait(lck);
    T ret_val = std::move(queue_.front());
    queue_.pop();
    return ret_val;
}
Enter fullscreen mode Exit fullscreen mode

The final step is to add a notification to the put method to awaken a waiting thread. Since put only adds a single item to the queue a notify_one() will suffice. If we were to add a method that inserts multiple elements, like the std::vector range insert, we could use the notify_all() to wake up all waiting threads.

void put(const &T val)
{
    std::lock_guard<std::mutex> lck(mtx_);
    queue_.push(val);
    empty_queue_cv_.notify_one()
}
Enter fullscreen mode Exit fullscreen mode

The full SynchronizationQueue implementation can be found here. I have included several utility functions such as a get with and without timeout.

As we've seen, using these basic building blocks it becomes possible to implement the higher order synchronization primitives that other languages have build in, such as Threadpools, Semaphores, Barriers and Notifications. If you are looking for a challenge you could implement your own threadpool using the building blocks discussed here, there are also several implementations available on Github you could have a look at.

Happy coding!


  1. This is also the reason that we need to upgrade our lock_guard to a unique_lock, they are similar, but the latter allows you to release and reacquire the underlying mutex. 

  2. See also: Spurious wakeup. The spurious wakeup is also the reason a condition variable always needs to be used together with a condition, even when this condition is just a boolean. 

Top comments (0)