This article was originally posted on my blog
Where Python is often called, "batteries included", one of C++ core principles is: "don't pay for what you don't use". And this too is visible in the C++'s standard template library (STL), it is deliberately kept small. One of the places this is visible is the threading functionality that the STL offers. Python's standard library contains everything from low level threading synchronization primitives (threads, locks) to high level ones (tasks, threadpools, futures). C++ only offers a single higher level one; the std::future
. Unfortunately, the std::async
function that returns these futures is usually implemented as little more than a wrapper around the std::thread
. This has the disadvantage of a overhead of thread creation and destruction on every call of std::async
.
This is why I generally prefer Threadpools, or a producer-consumer pattern. Both these patterns require a queue that contains the jobs or units of work. Worker threads will continually try to read items from such a queue and process the item. There are some libraries that offer threadsafe queue's and/or threadpools, such as poco, QT or boost, but it's actually fairly simple to implement your own, using only the STL. So let's implement one now!
Let's get started with the basic layout, we're creating a class called SynchronisationQueue
with 2 method, put
and get
, that is loosely based on Python's SimpleQueue class.
template<typename T>
class SynchronisationQueue {
public:
void put(const &T val);
T get();
private:
std::mutex mtx_;
std::queue<T> queue_;
};
The put
implementation is quite straightforward. We need to protect access to the queue with a lock guard and then we simple push the item on the queue. The get
is a little more cumbersome. We will need to check if the queue contains an item, if it does we can retrieve that and continue, but if it does not we will have to wait until another item is put on the queue that can then be returned. A simple, yet naive implementation, would be the following:
void put(const &T val)
{
std::lock_guard<std::mutex> lck(mtx_);
queue_.push(val);
}
T get()
{
while (true)
{
{
std::lock_guard<std::mutex> lck(mtx_);
if (queue_.size() > 0)
{
T ret_val = std::move(queue_.front());
queue_.pop();
return ret_val;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
This implementation has an obvious shortcoming. The sleeping for X milliseconds wastes cpu cycles, which can be minimized by increasing the timeout, but this in turn has a negative impact on the troughput of the queue.
Here the std::condition_variable
comes to the rescue, this is synchronization primitive that can be used to block threads, until another thread modifies the wait condition and notifies the waiting thread. The std::condition_variable
has two types of member functions, that each come in different varieties, based on your needs:
- wait: This function is to be called from the waiting thread, in the case of a JobQueue, this would be the workerthread waiting for a new item to be processed.
- notify: This function is called from the notifying thread, the thread that changes a condition and wants to notify waiting threads to wake up.
If we have a condition variable cv
, the most basic usage would look as follows:
// thread 1 waits for the condition
while(condition)
cv.wait(lck);
// thread 2 notifies that thread 1 can continue
condition = true;
cv.notify_one()
cv.wait()
does two things, it releases the lock and then sleeps until cv.notify_one()
or cv.notify_all()
is called from another thread. Then the condition_variable
unblocks and it reacquires the lock1, so you can safely continue using the protected resource.
Now you might wonder why we are continously checking the condition in a while loop. When there are multiple threads involved, there is no guarantee that between thread 2 setting the condition=true
there isn't a third thread that sets it back to false
. However, even if there is just the 2 threads, there is a chance that the condition variable awakens without being notified. This is called a spurious wakeup and it is allowed by the C++ standard to allow for some flexibility in the implementation of std::condition_variable
, though it is not clear how many implementations actually use this freedom2. So you will always need to check the condition after the condition variable is notified, which is easiest done in a while loop.
So to sum this up, whenever you wish to use a condition variable you need the following three parts:
- A std::condition_variable
- A std::mutex
- A condition protected by the mutex (2)
In the case of the SynchronizationQueue
the condition we will be using is the queue being empty. While it is empty we will wait until an element has been added to the queue by another thread. This thread will need to notify the waiting thread, signaling that it can continue and retrieve an item from the queue.
It is crucial that we do not alter the size of the queue without holding the lock, as this could potentially lead to race conditions. Apart from the obvious issues with accesing a std::queue
from multiple threads, there can also be an issues with the condition variable. If a thread was allowed to insert an item into the queue without holding the lock, an item could potentially be inserted between the worker thread checking the condition (queue being empty) and starting to wait on the condition variable. This could result in a state where the worker thread is waiting (potentially indefinitely) while there is in fact an item in the queue it could process. This is why you always must hold the lock while altering the condition, even when the condition could be changed atomicly.
In the following example we have added the condition variable empty_queue_cv_
to the private member data of the SynchronisationQueue. We can rewrite the get
method to make use of the condition variable:
T get()
{
std::unique_lock<std::mutex> lck(mtx_);
while (queue_.empty())
empty_queue_cv_.wait(lck);
T ret_val = std::move(queue_.front());
queue_.pop();
return ret_val;
}
The final step is to add a notification to the put
method to awaken a waiting thread. Since put
only adds a single item to the queue a notify_one()
will suffice. If we were to add a method that inserts multiple elements, like the std::vector
range insert, we could use the notify_all()
to wake up all waiting threads.
void put(const &T val)
{
std::lock_guard<std::mutex> lck(mtx_);
queue_.push(val);
empty_queue_cv_.notify_one()
}
The full SynchronizationQueue
implementation can be found here. I have included several utility functions such as a get
with and without timeout.
As we've seen, using these basic building blocks it becomes possible to implement the higher order synchronization primitives that other languages have build in, such as Threadpools, Semaphores, Barriers and Notifications. If you are looking for a challenge you could implement your own threadpool using the building blocks discussed here, there are also several implementations available on Github you could have a look at.
Happy coding!
-
This is also the reason that we need to upgrade our
lock_guard
to aunique_lock
, they are similar, but the latter allows you to release and reacquire the underlying mutex. ↩ -
See also: Spurious wakeup. The spurious wakeup is also the reason a condition variable always needs to be used together with a condition, even when this condition is just a boolean. ↩
Top comments (0)