DEV Community

loading...

Multithreading by example, the stuff they didn't tell you

glpuga profile image Gerardo Puga Updated on ・27 min read

The problem

Alt Text

Concurrent code can be daunting. In the very moment when you trigger the creation of a second thread in your system you're opening the gates for a completely new type of bug to come crashing through the doors...

And it's not just bugs of a new kind, but bugs of the worst kind:

  • They happen infrequently.
  • They are strongly dependent on execution context (timing, load, etc.).
  • When they happen, they are hard to reproduce, and hence to test for.
  • Very often they are perfect examples of heisenbugs: they vanish or mutate as soon as you set your debug tools on them.

The idea for this article came to me while implementing a very simple multi-threaded event queue, where one or more classes can post an events, and one or more independent subscribers can be notified of these events through a callback.

My initial requirements were actually very relaxed, since the use case I needed to satisfy was a really tame one.

And yet, as I created the implementation I realized that, simple as it was, the code was very good example of a large number of implementation issues that I often get asked about.

So I decided I would write about it, because I'm lazy, and I'd like to avoid repeating myself in the future.

This is what you can find in the rest of the article:

That's nice. Where's the code?

This is the interface for the class this article will talk about, minus the boilerplate.

class EventQueue {
 public:
  void postEvent();
  CallbackHandler subscribeToEvent(const CallbackFunction &callback);
};

You can find the rest of the code in this repo. I'll be cutting an pasting interesting bits from it below as they get talked about.

It's a publisher/subscriber architecture, where one or more clients subscribe to an event, and register a callback to be called when the event is posted.

One or more sources can post instances of the event. Each time the event is posted, all of the client's callbacks get triggered. Events carry no information, other than the fact that they "happened". This is what makes this different from a message queue.

For the sake of simplicity, and because it suited my use case better, the code only guarantees that callbacks will get called at least once after each posted event, and not once for each posted event.

This means that if an event is posted multiple times before the event subscribers were notified of the first one, then events after the first will not be counted as a separate instance, and subscribers will be called only once.

This makes EventQueue kind of a misnomer, because it's not really a queue if you can "miss" events. However, from a design point of view, since events carry no information with them, then nothing is lost because of this design choice unless you want to count events (which is perfectly valid use case, just not mine).

The implementation of EventQueue has a postEvent() method for event sources, and a subscribeToEvent() method to allow clients to register a callback.

This is as simple as asynchronous message passing can get. And yet, there's still stuff to unpack here.

Unpacking the stuff

Event sources should be decoupled from subscribers

It would be trivial to write postEvent() so that it loops through a vector of client callbacks calling each, but that would mean that the called is blocked by the clients while the callbacks execute.

That couples the execution flow on the event source to the event receivers, and in a multi threaded where modules on each end should be protecting their internal states with mutexes, this kind of coupling is likely to end up causing a deadlock.

We'll decouple the event source by making postEvent() non-blocking. Posting the event only triggers the event notification, but control will be returned to the sender right-away, potentially even before any of the subscribers have been notified.

Notice that this goes beyond just making postEvent() not to wait for the event to be delivered before returning. It's easy to forget that if postEvent() needs to take ownership of the same EventQueue mutex that gets taken by the parts of the code that deliver events to clients, then it might block trying to get access to that mutex while a previous event is getting delivered to event subscribers. In most applications this is almost as bad as blocking while the event gets delivered, and just as likely to cause a deadlock.

No mutex in postEvent(), got it. However, this method still needs to access the EventQueue object internal state in a thread-safe manner, because callbacks will be executing from a separate thread.

To do this, the code uses a condition variable to trigger the processing of the event in an event forwarder thread:

void EventQueue::postEvent() {
  pending_event_ = true;
  cv_.notify_all();
}

Notice that postEvent() never locks. It just sets a boolean flag, notifies some code with a condition variable, and moves on. There's no way to block it.

I'll go back to that flag later.

Subscribers can be decoupled from each other, but I won't

Subscribers could be decoupled from each other too, but that's not my use case. I will just call each subscriber callback in the list sequentially, one after the other, every time an event gets posted.

This is unlikely to cause a deadlock by itself, since callbacks will usually take ownership of their target's mutex on entry and release it on exit.

This can, however, cause additional processing delays if one the callbacks calls something blocking or takes a long time to execute, but we're in a cooperative-scheduling scenario here, and we are all good gentlemen.

An alternative design would be to hold separate notification threads for each of the subscriber's callbacks, but in most cases the extra effort is not worth the cost if your subscriber's callbacks are guaranteed to be quick and non-blocking (and all callbacks should always be).

If your callbacks can be considered unsafe or prone to being manipulated by an adversary then you would do well to play it safe and walk this extra mile out of precaution. This is seldom the case in monolitic software, however.

Identify the execution threads at play

When writing concurrent code you should always be aware of what threads are at play around a given piece of code.

There will be threads external to your object, such as the ones executing postEvent(). These are potentially rooted in other objects in your system, and they are only temporarily executing code that belongs to your object (in this case EventQueue).

These external threads may be holding locks on mutexes elsewhere on the system (typically, the one protecting the object that created them), and may need to acquire additional mutexes in order to gain access to protected assets within an object the need to interact with.

Take for instance the case of a piece of code creating a copy of a pre-existing shared pointer. The execution thread (the one making the copy) is external to the shared pointer, and that thread needs to acquire the mutex within the shared pointer in order to update the pointer's reference count. It does so while executing the copy-assignment operator or the copy-constructor of the smart pointer class.

External threads are enough for a large number of types of objects whose internal state is only updated by interactions through their public interface. The std::shard_ptr class is an example of that kind of object.

Other objects, however, also need to be able to self-propel their own state forward, even in complete absence of external stimuli. These objects will usually have one or more internal threads -threads created by the object itself- that are there tasked with driving the evolution of the internal state of the objects through interactions with the exterior.

The EventQueue class is of this last kind: In order to decouple the event source from the subscribers the object needs to take upon itself the task of delivering the posted events to the subscribers. Executing the subscriber callbacks requires a thread, and therefore EventQueue creates an internal thread to do this, one that we'll call forwarderThread (since that's the name of the function that hosts its code).

This thread spends most of its time asleep. Each time an event is posted through postEvent() this internal thread gets woken up to take care of the delivery. The code of the thread looks like this:

void EventQueue::forwarderThread() {
  std::unique_lock<std::mutex> lock{mutex_};
  while (!halting_) {
    cv_.wait_for(
      lock,
      max_sleep_interval_,
      [this]() { return pending_event_ || halting_; }
    );
    if (pending_event_) {
      pending_event_ = false;
      triggerSubscribers();
    }
  }
}

We'll talk a lot about this function, but let's go one step at a time. For now lets just say that this code spends most its time sleeping on the condition variable cv_ until something wakes it up using cv_.notify_all(), such as for instance, a call to postEvent().

It's also important to understand that the thread owns the mutex while while it's awake, but the condition variable releases it while it sleeps in wait_for().

Whenever a cv_.notify_all() call wakes up the forwarder thread, it re-acquires the mutex and springs to action, delivering an event to clients by calling their registered callbacks. After doing that, it goes back to sleep.

Only the destructor can terminate this loop, by setting the halting_ flag to indicate that the thread needs to exit. This is important, because threads need to be joined before destroying them. thread::join() is a blocking call that waits until a thread terminates execution; if a thread is destroyed without having been joined, it will kill your system as steadfastly as a segmentation fault, and with about the same amount of pomp. The halting_ flag tells the thread that it needs to exit, so that it can be joined. It's like whistling to bring back your dog.

Before we move on: callbacks are evil

That's true. They are.

A callback is someone else's code that gets executed on your thread. Or it's your code, that executes on someone else's thread. Either way, you're not fully in the driving seat; you're literally sharing the seat.

Also, at its very best, a callback is just a raw pointer on one module pointing to another, and this raises lots of questions regarding the relative life scope of each object with repect to the other. Think of this scenario.

ObjectA owns ObjectX and ObjectY. During construction it set a callback in ObjectY to trigger a method in ObjectX when a message M is received. ObjectA is destroyed, and during the termination sequence it first destroys ObjectX, then ObjectY. After the demise of ObjectX, but before the end of ObjectY, a message M arrives, and the callback gets triggered an calls the method on ObjectX, all of whose data members have already been destroyed.

I hear you say:

  • "That's bad design, you have horizontally rigged your system and your architecture does not enforce a functional dependency between X and Y". It's true, I say, but the callback making the system crash did not make this any better. Also, even if the callback was hosted in ObjectA instead of ObjectX, this sequence of events would still cause a crash if the callback code used ObjectX during its execution.
  • "That's a corner case, the system is winding down anyway so no harm is done". A corner case, true; harmless, false. You're potentially wrecking the destruction process. I've seen code like this try to acquire an already destroyed mutex, which is undefined behavior, and then block forever causing the software to linger until I had to SIGKILL it. If you're "lucky", your code will crash right away, but even in that case you may lose information that was supposed to be orderly stored by your termination sequence.

This only gets worse if you think of a case in which you have subscribers that come and go during the life scope of the callback-calling class. If the subscriber objects are temporary you need to make sure you unsubscribe from the callback before you destruct each of them. But there's no way to enforce this, because a callback is just a raw pointer, so it's error prone and it's easy to cause a software crash during execution.

Callbacks are a necessary evil, though. But we can make them a bit more RAII. More on that later.

volatile vs atomic

You can injure yourself if you don't know this. If more than one thread is at play, there are some guarantees that are lost.

The short version is, if you have two asynchronous execution threads (for instance two Linux process threads, but also a task's code and an interrupt service routine on a humble embedded RTOS), and you want them to signal each other through a variable such as

bool halting_{false};

then that's a recipe for failure.

The reason for that is that the compiler does not know about that hidden Read-After-Write (RAW) dependency between the writes on one flow of execution and the reads on the second one, and it's very likely to either partially or totally optimize the connection through memory away.

This can happen in a number of ways, but the most frequent one is because any compiler worth its salt will optimize your object code so that variables get cached within CPU registers for as long as possible to improve the speed of access, and no actual memory accesses will really be performed except for reading the initial value of the variable at the start of the use scope, and storing the final value at the very end of it. If your code is waiting for a variable value to change in a loop, caused by a source that the compiler does not know about, then your code will most likely wait forever.

Notice if you risk it and decide to use a regular variable to synchronize independent execution flows, you may find that everything works "as expected", but the actual outcome will vary depending on the architecture of your code, your build configuration, the version of your compiler and what you had for breakfast last week. Your code may start failing a few minutes/days/months/years later when someone else makes an unrelated change to the code, updates the compiler version, or eats last-night's pizza for breakfast instead of butter on bread.

So bare variables is a no-no. How about volatile?

volatile bool halting_{false};

That's a big no-no too, but it's a different kind of error. Lots of people think volatile can be used for blissful inter-thread communication through a shared variable, and that's correct for other programming languages, and it even works for C++ code running on simpler single-threaded processors with simple memory hierarchies.

It's not true in general, however, because volatile is not meant to be used for that, and you should not use it that way because the outcome in that case would be undefined. Quoting directly from cppreference.com on the topic:

Every access (read or write operation, member function call, etc.) made through a glvalue expression of volatile-qualified type is treated as a visible side-effect for the purposes of optimization (that is, within a single thread of execution, volatile accesses cannot be optimized out or reordered with another visible side effect that is sequenced-before or sequenced-after the volatile access. This makes volatile objects suitable for communication with a signal handler, but not with another thread of execution, see std::memory_order)

The prose here admittedly a bit obscure but the warning is clear. A much clearer discussion of the topic can be found in Effective Modern C++, which I encourage you to read. In there, Meyers states that the guideline is:

Use std::atomic for concurrency, volatile for special memory.

Special memory in the context of the quote means, for instance, a memory-mapped I/O register in a micro-controller.

So, what is std::atomic? It's a template that "decorates" a bare type in such a way as to ensure two things:

  • Updates to the variable of an atomic type are seen as atomic by readers. That means they will either see the value after or before the update, but never a mixture or partially updated state.
  • The variable of an atomic type performs inter-thread memory synchronization on updates, making sure that after a thread writes on it, any other thread that reads the same variable will read the updated value.

What's the downside of this? why not default to atomic behavior? As you are very likely expecting, because it's not cheap, an atomic access is always more expensive than an access to a regular type variable:

  • There's no in-register caching of an atomic variable. That means no single-cycle read/write access even for atomics of simple integer-like types, such as bool. You'll always go through the memory hierarchy to access them, and pay the price for it.
  • Accesses to an atomic trigger a synchronization process between the caches of different cores in the CPU to ensure a consistent event-ordering for all threads on the system. This is a punch right on the performance on any modern multi-core, deeply-pipelined processor.
  • For smaller datatypes (bool, for instance), std::atomic<T> may use an implementation that uses atomic load/store instructions available on most modern processors, but for larger or user-defined datatypes, std::atomic will basically attach a mutex to the variable and acquire/release around each access. This can be even more expensive.

So, and here I'm setting the text in bold again: use std::atomic for unprotected variables you share between two threads, and use them sparingly.

That's why in EventQueue we use

std::atomic_bool pending_event_{false};
std::atomic<bool> halting_{false};

to transfer information between the some external thread executing postEvent() or the EventQueue destructor, and the internal thread that drives event delivery to subscribers. We need that information to get through across threads.

As a final note on this section, unprotected in the bold text means out of the protection scope of a mutex. See next section.

What if I have lots of shared variables between threads?

This is frequently the case. Most often your object's internal state is made of a multitude of internal data members, and updates to that state can be triggered by code executed by either internal or external threads.

In that case you don't make everything std::atomic<T>. That would only make access to your individual variables atomic, but the internal state of your class would still not be thread-safe, because different threads execution flows could still intermingle in undefined-and-most-likely-not-safe ways.

Take for instance,

EventQueue::CallbackHandle EventQueue::subscribeToEvent(const CallbackFunction &callback) {
  std::lock_guard<std::mutex> lock{mutex_};
  auto handler = std::make_shared<CallbackFunction>(callback);
  subscribers_.emplace_back(std::weak_ptr<CallbackFunction>(handler));
  return handler;
}

void EventQueue::forwarderThread() {
  std::unique_lock<std::mutex> lock{mutex_};
  while (!halting_) {
    cv_.wait_for(lock, max_sleep_interval_, [this]() { return static_cast<bool>(pending_event_); });
    if (pending_event_) {
      pending_event_ = false;
      triggerSubscribers();
    }
  }
}

These can execute concurrently; the first one is driven by an external thread, from some other object on the system makin g a call on EventQueue, while the second one is our internal thread's main body.

Both are somewhat complex functions, especially the second one. You might be tempted to say that the first one only makes an update to the subscribers_ vector, and that that operation can be made atomic with std::atomic<T>, and that would be true, but as you'll see later, triggerSubscribers() iterates on that vector, and iterating is not an atomic operation, it is a sequence of operations.

That's why we need to prevent these two methods from executing at the same time, and to do that we force them to only make changes to the state of the object (that is, to any member of the object) while holding a lock on a shared std::mutex variable.

The mutex can only be owned by a single thread thread at the same time. If we enforce that all changes to the state will only be done while owning the mutex, then that forces all updates to the state to be made in an ordered fashion, and each individual change can be thought of as atomic.

Notice that enforcing this is up to the developer. Nothing prevents me from updating the state without holding the mutex, but that's usually unsafe unless you have some other way to make sure of making changes to the object state in an atomic fashion. postEvent() does not use a mutex, but it only makes a very bounded type of change by setting an std::atomic_bool flag, so we're safe (to the extent anything in concurrency is safe).

There are many ways to lock a mutex, but two are the most common and both are on display in the code above:

  • std::lock_guard
  • std::unique_lock

std::lock_guard is the simplest one. In fact, it only has two methods: the constructor and the destructor. An instance of this class locks the mutex on when it gets constructored, and releases it when it's destroyed. This is effectively a use of the RAII idiom, where the resource being owned is the lock on the mutex.

std::unique_lock, is behaves in mostly the same way, but also has a few more features. The defining one is that it provides lock() and unlock() methods to temporarily release the lock manually. std::condition_variable::wait_for() uses these to unlock the mutex while it waits for the condition variable notification, so that's the reason std::unique_lock needs to be used there.

std::unique_lock provides a superset of the features of std::lock_guard, but you should always prefer the later unless you really need the features in the first one.

The creation of the lock (either lock_guard or unique_lock) should be the first sentence in the scope you want to protect, usually a public method of your object; anything before that would not be protected by the mutex. Everything after that all the way to the end of the scope will be.

Ok, locking a mutex gives us mutual exclusion, and therefore atomicity. What about inter-thread synchronization to guarantee side-effect ordering consistency on memory accesses and stuff? std::atomic<T> was big on this too.

Well, each time you lock/unlock a mutex memory access synchronization is guaranteed so that any variable you update within the scope of the lock will be visible with the updated value to any other thread that wants to read it later.

So, if you protect all accesses to a variable within the scope of locked mutex, the variable can be just a regular variable, because the mutex will ensure synchronization on lock/unlock. That's the case for subscribers_, but not for pending_event_, which is also updated from postEvent().

As as I said earlier, postEvent() does not lock the mutex to avoid having to potentially wait for the mutex to be released while the callbacks are getting executed. To make sure the updates to pending_event_ are seen by the forwarder thread we need to make that variable and std::atomic_bool.

This design choice will impact something else, one that can be the source of a very subtle bug if you're not careful.

Callbacks, yes, but RAII-ish

Callbacks are a necessary evil, I said. I need callbacks on this code.

However, instead of going for raw callbacks that would have to be both registered and unregistered manually by event subscribers, this code tries to at least ensure that callbacks will get automatically unregistered at the very moment when a handler returned by the subscribeToEvent() method goes out of scope.

The idea is that a subscriber will register a callback to receive notifications from the event by doing something like this.

auto handler = event_queue.subscribeToEvent(
  []() { doSomething(); }
); 

The callback (in this case a lambda) will remain registered to the event represented by event_queue instance all the way until the point where handler goes auto of scope or is intentionally destroyed.

Once the handler goes out of scope, the event_queue instance will automatically remove the callback from the subscribers_ list with no need of any further interaction from the subscriber.

To implement this the code uses std::weak_ptr, which is a weak version of shared_ptr that does not hold ownership over the pointed-to-data, but that allows us to:

  • Check if an associated shared_ptr still exists.
  • Get a copy of the associated shared_ptr if at least one other copy still exists.

So, keeping in mind that EventQueue defines following aliases in class scope,

  using CallbackFunction = std::function<void()>;
  using CallbackHandle = std::shared_ptr<CallbackFunction>;

the implementation of subscribeToEvent() becomes is very simple:

  auto handler = std::make_shared<CallbackFunction>(callback);
  subscribers_.emplace_back(std::weak_ptr<CallbackFunction>(handler));
  return handler;

All this does is:

  • It creates the original shared_ptr<CallbackFunction>. This shared pointer is the CallbackHandle that subscribeToEvent() will return to the caller.
  • It creates a weak_ptr<CallbackFunction> associated to the shared_ptr created above, and stores that weak pointer in the internal list of subscribers. This weak pointer will be used by EventQueue to track which subscribers have been destroyed before calling on their callbacks.

The triggerSubscribers() method, which gets called by forwarderThread() and is therefore within the mutex protection scope of the second's mutex lock, basically only does two things: it removes any weak_ptr whose associated shared_ptr has gone out of scope, and then it iterates on the remaining subscribers calling the callback on each.

void EventQueue::triggerSubscribers() {
  subscribers_.remove_if([](const EventQueue::WeakCallbackHandler &handler) { return handler.expired(); });
  ...
  std::for_each(subscribers_.begin(), subscribers_.end(), caller);
}

Notice that the caller() lambda needs to check again whether the weak_ptr is still valid, because the shared_ptr might have been destroyed before the call to subscribers_.remove_if() but before std::for_each().

auto caller = [](const EventQueue::WeakCallbackHandler &weak_handler) {
  auto handler = weak_handler.lock();
  if (handler) {
    (*handler)();
  }
};

Notice that while this is an improvement manually registering/unregistering callbacks, there's still one issue with this code. We'll go back to it in the next section.

The devil hides is in the details

There's more than one devil hidden here.

The "lost event" pitfall

The first hidden devil, which is easy to fall for, is also relatively straightforward to detect and understand.

In forwarderThread(), if we change

if (pending_event_) {
  pending_event_ = false;
  triggerSubscribers();
}

for

if (pending_event_) {
  triggerSubscribers();
  pending_event_ = false;
}

then we are introducing a bug that could very easily be missed during testing unless we are looking for it.

Setting the flag to false after running the callbacks means that if the event is posted again after checking the value at if (pending_event_) but before pending_event_ = false;, the event might be missed by some of the subscribers.

Instead, setting it false before running the callbacks means that regardless of where a second event falls, the guarantee that all callbacks will be called at least once after each posted event still holds for all subscribers.

That one was easy.

The "deaf condition variable" pitfall (part I)

You may have noticed that the wait_for() call in forwarderThread() has a timeout parameter:

cv_.wait_for(
    lock,
    max_sleep_interval_,
    [this]() { return pending_event_ || halting_; }
);

It's perfectly reasonable to question this: Why would a timeout be needed here? This is code that should only wake up whenever there's an event to deliver, there's not really a reason to wake it up just because a timeout has expired.

Even more so, since the timeout causes the condition variable to be able to wake up for reasons other than receiving an event, it needs to check if an event was posted when it wakes up, and if not it'll just go back to sleep!

The first part is almost true, and we'll go back to that later. The the second one less so, because the destructor may also wake the condition variable, and we don't want to trigger the callbacks spuriously because EventQueue is getting destroyed, so we need to check if there's an event to deliver to clients regardless of the timeout.

Let's take a look at how destruction takes place.

EventQueue::~EventQueue() {
  halting_ = true;
  cv_.notify_all();
  forwarder_thread_.join();
}

This is a typical way to terminate an internal thread in objects that have one.

Notice that we don't lock the mutex in this method. We can't.

If we did cv_.wait_for() in forwarderThread() would be unable to wake up because it would not be possible for it to lock the mutex, and the destructor would block in forwarder_thread_.join() waiting for the thread to terminate execution. Since both threads are waiting for the other to move forward to move themselves, no progress can be made. This is a classic deadlock scenario.

To avoid this the destructor does not try to lock the mutex and instead it signals the intention to kill the thread through an std::atomic_bool variable.

Notice that, because the destructor code as a whole is not atomic, the order of the instructions matters:

halting_ = true;
cv_.notify_all();
forwarder_thread_.join();

If the order of these lines is changed the forwarder thread might not wake up, or might wake up and miss the halting flag and go back to sleep

We're good then. We won't miss the flag because we are setting stuff in the right order.

Not so.

The "deaf condition variable" pitfall (part II, the revenge)

There's more to say about the condition variable. Let's talk about the lambda in the third argument to wait_for.

Let's start by stating that, in theory, this should be enough for the code to work:

while (!halting_) {
  cv_.wait(lock);
  if (pending_event_) {
    ...
  }
 }

Both the destructor notifications and new event that are posted would be able to wake this loop and process the input acording to the source. However this has at least two issues.

The first one is that a condition variable will not be aware of any notify_all()/notify_one() that is called while it's awake.

So, if the destructor gets called in the unlucky interval in which forwarderThread() goes from evaluating while (!halting_) to setting the thread to sleep in cv_.wait(lock);, then you're dead. Unresponsive. The destructor will block waiting for the thread to terminate, but the thread will be sleeping and won't wake up because it missed the condition variable notification and the flag.

Notice that a very similar scenario would miss (or at least delay) a newly posted event if the notification arrives while forwarderThread() is still awake delivering a previous event.

In both cases the system is not really totally blocked, because the lucky arrival of a new postEvent() call would bring the loop back to the land of the living by waking up the forwarderThread(), but you should not count on this.

Also, it also turns out that condition variables have the annoying habit of sometimes waking up just because the feel like it. If you don't want to take my word on it, you can read the very small print in one cppreference.com:

It may also be unblocked spuriously.

Before you wonder, it's not a bug, it's feature. It's just too hard to create condition variable implementations that don't do that, so we need to live and code with this in mind.

To avoid spurious awakenings, a second form of the wait_for() call provides a predicate function parameter. This is usually just a lambda that should return false if the condition for awakening is not satisfied. When a predicate is provided, the condition variable will check the predicate while waking up and if the value is false then it will assume that it woke up spuriously and go back to sleep.

Agreed, the predicate sounds like a good idea. The code would look like this now:

while (!halting_) {
  cv_.wait_for(
     lock,
     [this]() { return pending_event_ || halting_; }
  );
  if (pending_event_) {
    ...
  }
 }

Notice that the predicate not only solves the issue of spurious awakenings. It also fixes missed notifications! if postEvent() got called while the code was running the callbacks, then when it loops back to the wait_for() call the predicate will be true, so the condition variable will not even go into sleep mode and a new processing round will start.

Similarly, it also fixes the unfortunate case where the destructor gets called in that narrow stretch between the while and the wait_for.

That's it, right? we're done! the timeout was useless.

Not so. Concurrency is such a fragile thing.

It may not look like it from our abstraction level, but

[this]() { return pending_event_ || halting_; }

actually takes some time to get executed. A small one, but still. There's non-zero chance that the two lines

 halting_ = true;
 cv_.notify_all();

will get executed after halting_ was evaluated by the lambda, but before cv_.wait_for() got to sleep.

The end result is the same as before: the destructor blocks, waiting for the thread to terminate, but the forwarder thread misses the notification and goes to sleep with no-one to wake it up because the destructor won't try twice.

Worse, a similarly ill-timed call to postEvent() can suffer the same fate.

So, after adding the predicate our worst possible case is still the same as before: blocking for an indeterminate amount of time. We just narrowed the time window during which that may happen.

Feeling depressed? I know the feeling. But hey! what are the chances, right? You need to be really unlucky to hit this issue!

Not so unlucky, it turns out. The repository provides a few units tests along with the EventQueue code. This is the first one:

TEST_F(QueueEventTest, ConstructionTest) {
  auto uut = std::make_unique<EventQueue>();
  uut = nullptr;
}

This unit test only creates an EventQueue and then destroys it right away. It's a perfectly valid test that checks the least amount of functionality that can be tested: construction and destruction don't blow up.

Turns out that this test hits the sweet spot for the above described missed-notification-failure frequently enough that if that test is run on a CI you coworkers would be upset by the random failures of your test very soon after it gets added.

There are two solutions to this:

Lock the mutex around any code that may wake the condition variable

If we set a lock to own the mutex around the code that sets the variables and signals the condition variable to wake up, this would fix the problem, because since the forwarder thread owns the mutex while it's awake, then mutual exclusion would ensure that flag updates and wake up notifications only get sent while it is asleep!

Great!

No! As I said before, we can't lock the mutex in the destructor. Not for the whole length of it, at least. We could, potentially, do this:

EventQueue::~EventQueue() {
  {
    std::lock_guard<std::mutex> lock{mutex_};
    halting_ = true;
  }
  cv_.notify_all();
  forwarder_thread_.join();
}

It's not pretty, but it would get the job done, and in this case halting_ could be just a regular bool instead of an std::atomic_bool.

However, the problem with not allowing anyone to do anything unless they own the mutex is that, well, no-one can do anything unless they own the mutex. That includes postEvent() which would have to wait until the mutex is released before being able to post a new event. This might cause the postEvent() caller to block while a callback is waiting for a different mutex, opening the opportunity for an indirect deadlock.

Alternative designs can avoid this, at the cost of some additional complexity, but more complexity also means more potential for other subtle bugs like this creeping in the system.

The cheap solution

The chances of this failure scenario happening are very low. If we keep damage low too, total risk is low too. If risk is low, the cost/benefit is on the side of a cheap-but-safe fix, rather than a complex fix that may open the door to equally complex bugs.

Turns out a very the cheap solution is to use a timeout.

cv_.wait_for(
    lock,
    max_sleep_interval_,
    [this]() { return pending_event_ || halting_; }
);

The timeout puts an upper bound to the time the code can be blocked if any of the unfortunate scenarios I described above play out: if something like that happens, you system won't block for an indeterminate amount of time; in the worst case it will only block for the duration of the timeout.

The side effect is that the loop in forwarderThread() is going to wake up periodically even when nothing is happening, so most of the time the thread will wake up only to go back to sleep again.

However, if your timeout is reasonable enough, the cost of periodically waking because of the timeout is negligible, and reasonable to keep bugs at bay.

I won't go that far as to claim that this is the best solution to this. It's not a compromise I'm particularly happy to make, but it's one that usually gets the work done. I'll be happy to hear ideas for alternative solutions for this!

One more, for the road

Finally, let's go back to the caller() lambda in triggerSubscribers(). As I said before, this only partially solves some of the issues I enumerated about callbacks.

auto caller = [](const EventQueue::WeakCallbackHandler &weak_handler) {
  auto handler = weak_handler.lock();
  if (handler) {
    (*handler)();
  }
};

There's a kind of worrying problem here, and you may have already noticed it.

While weak_handler.lock() will atomically return a shared pointer to the callback, or a nullptr if no more copies of the callback exist, there's no way to guarantee, with this implementation, that the handler will not be destroyed after

auto handler = weak_handler.lock(); 

but before the handler callback gets called in

(*handler)();

Crap! We might be calling a callback right after the client decided it did not want any more calls! This is a completely plausible scenario, in which, for a short moment, the only remaining copy of the callback handler is the one in the caller() lambda, which is for sure not the intention of the code here.

I won't go as far as fixing this here. The article is already long is it stands now, so I'll let the reader figure this one out.

A general idea would be that we need to make getting a copy of the shared pointer, testing and running the handler an atomic block with respect to callback handler destruction. That is, either all of this gets executed before destruction of the handle ends on the client side, or none of it does.

Overcoming concurrency's hardships

The code for EventQueue is almost as simple as any multi-threaded object can be in C++1x, and yet at no point were we short of potential problems.

Concurrency is hard. Even well-weathered professionals with years coding multi threaded software find themselves amused by new ways in which a system can throw itself into a pit, by managing to deadlock through a loop formed by three different mutexes in different places of the system when the input message timing is just unlucky enough.

Fortunately there's a solution for those endless hours of debugging multi threaded code.

The solution: write bug-free concurrent code

When creating concurrent code, testing will just not cut it, so you need to write flawless code.

That's it. Solved. You´re welcome.

That's wishful thinking. What else have you got?

Ok, the next best thing then. The next best thing right after writing flawless code is writing safe code.

  • Be conservative. Assume the worst possible timing for anything that might happen, worry that you algorithm can be unscheduled and replaced by a competing thread at any moment, keep in mind that the system is full of latencies, and that things will never execute in any particular order unless you force it through explicit synchronization.
  • Keep it simple. In concurrency complexity arises naturally from the huge number of ways in which different execution flows can be shuffled like a pack of cards. The more complex your code is, the larger the number of execution outcomes that can result from those combinations. The fewer possible outcomes, the easier it is to plan for all of them.
  • Correctness over performance. Concurrency is very hard to debug. If your code is fast but only a bit buggy, you'll spend more time debugging than your code will ever save compared to a simpler but less error-prone solution.
  • When given the choice between competing design solutions use the most conservative one that still makes sense for your system.
  • Your code will still fail, so when possible fail gracefully.

This is not a procedure. It's a mindset. And it's kind-of-obvious one, but most people don't code like this citing some pretext that usually alludes to efficiency, performance, execution time or some other imaginary quality of their software that they have usually only measured in their minds.

This mindset is especially important for concurrent software, because most bugs, if present, will get through testing and into production code unseen and unheard, like ninjas.

That's it, the end

That was much longer than I though when I started. Thanks for reading this far. Please let me know if you liked it!

Discussion (0)

pic
Editor guide