DEV Community

Cover image for SObjectizer Tales - 19. Thread-safe handlers
Marco Arena
Marco Arena

Posted on • Originally published at marcoarena.wordpress.com

SObjectizer Tales - 19. Thread-safe handlers

Having devoted several episodes to enhancing the robustness of our agents, this time we delve into another topic. An intriguing feature that simplifies the design of agents under specific circumstances.

In the last installment, we left with an open question: is it possible to distribute work among multiple workers spawning only a single agent?

Recalling a prior episode, we learned that a straightforward job distribution structure can be established by utilizing multiple worker agents that receive messages from a shared message chain. Under this setup, we introduced a sort of “coordinator” agent responsible for creating the workers, initializing and eventually closing the message chain, and establishing the message binding to redirect messages from the input message box to the shared message chain. In other words, the components involved were:

  • a coordinator agent
  • N worker agents
  • a message chain
  • a message binding

At this point, we wonder whether all these elements can be replaced with a streamlined design involving just a single agent. In this article, we’ll provide an answer to this question, applicable exclusively when the task remains “stateless,” meaning it doesn’t alter the agent’s state. The catalyst for this approach is a powerful feature of SObjectizer that permits the designation of message handlers as thread-safe.

Thread-safe message handlers

As we have learned so far, SObjectizer executes agent’s message handlers serially, one after the other. Consider, for instance, our image_resizer:

class image_resizer final : public so_5::agent_t
{
public:
    image_resizer(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output, double factor)
        : agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output)), m_factor(factor)
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_input).event([this](const cv::Mat& image) {
            cv::Mat resized;
            resize(image, resized, {}, m_factor, m_factor);
            so_5::send<cv::Mat>(m_output, std::move(resized));
        });
    }

private:
    so_5::mbox_t m_input;
    so_5::mbox_t m_output;
    double m_factor;
};
Enter fullscreen mode Exit fullscreen mode

When images are sent to the input channel, the execution of the handler will occur sequentially, one image after the other. This not only ensures the frame processing in sequence but also guarantees the output of the results in the same order.

However, from a semantic point of view, each resizing operation stands independently of the others, and it does not change the agent’s state. Suppose we aim to parallelize multiple resizing operations across multiple workers, similar to what we did with the image_saver. As we learned, we could follow the “recipe” discussed before by introducing a sort of “coordinator,” a worker type, a message chain, and a binding. Perhaps there’s potential for some refactoring to create a couple of more generalized agents. However, we are on the verge of exploring an alternative and notably simpler solution: thread-safe handlers. This feature is briefly documented here and here.

In essence, SObjectizer provides a feature that allows users to mark message handlers as thread-safe, meaning that they can be executed in parallel without changing the agent’s state (or doing it in a thread-safe manner). Thread safety becomes pertinent only for dispatchers employing thread pools. Such dispatchers have the capability to recognize handlers marked as thread-safe and to migrate an agent from one working thread to another in the pool.

To mark a message handler as thread-safe, we simply add a special parameter so_5::thread_safe to the subscription. For example:

void so_define_agent() override
{
    so_subscribe(m_input).event([this](const cv::Mat& image) {
        cv::Mat resized;
        resize(image, resized, {}, m_factor, m_factor);
        so_5::send<cv::Mat>(m_output, std::move(resized));
    }, so_5::thread_safe);
}
Enter fullscreen mode Exit fullscreen mode

As you might imagine, by default this parameter is set so_5::not_thread_safe, as SObjectizer presumes that all handlers within an agent are to be executed in a serialized manner. Conversely, if we mark an handler with so_5::thread_safe, it is our responsibility to ensure that executing the handler does not change the agent’s state or, it does without causing race conditions. In this trivial example, image_resizer‘s job is stateless.

How to execute this on multiple threads?

Currently, SObjectizer provides only one dispatcher able to recognize and use handlers’ thread-safety: [adv_thread_pool](https://github.com/Stiffstream/sobjectizer/wiki/SO-5.8-InDepth-Dispatchers#adv_thread_pool-dispatcher). Simply speaking, this dispatcher is similar to thread_pool (we met in a previous post) but its efficacy lies in the capability to effectively schedule tasks, especially when an agent contains both thread_safe and not_thread_safe handlers. In this case, the dispatcher ensures that all thread-safe event handlers complete their tasks before commencing the execution of any non-thread-safe event handler and vice versa.

This implies that only one non-thread-safe event handler can be invoked at a time, with the subsequent handler being called only after the completion of the previous one. Multiple thread-safe handlers have the capability to run simultaneously.

The adv_thread_pool supports two types of FIFO similar to those found in the thread_pool dispatcher: cooperation and individual. In addition to the behavior discussed for thread_pool, in adv_thread_pool this parameter impacts how handlers thread safety is managed.

Let’s suppose we have agents A and B within a single cooperation that are bound to an adv_thread_pool with 3 threads and cooperation FIFO ordering. Additionally, let’s assume that A’s m1 and B’s m2 message handlers are thread-safe, while A’s m3 and B’s m4 are not. If the messages arrive in this order:

{m1, m2, m2, m3, m2, m1, m4, m3}
Enter fullscreen mode Exit fullscreen mode

then the sequence of message handler invocations might occur as follows:

  1. A’s m1 will be called on thread T1;
  2. B’s m2 will be called on thread T2;
  3. B’s m2 will be called on thread T3;
  4. only after the completion of A’s m1 and both B’s m2, the handler A’s m3 will be called on T1;
  5. only after the completion of A’s m3, B’s m2 and A’s m1 will be called in parallel on T1 and T2;
  6. only after the completion of B’s m2 and A’s m1 then B’s m4 will be called on T1;
  7. only after the completion of B’s m4 then A’s m3 will be called on T1.

In simpler terms, when employing cooperation FIFO, thread-safe handlers from any agents can be executed concurrently on different threads. However, non-thread-safe handlers won’t be invoked in parallel, even if they belong to different agents. In the example above, the execution of m3 and m4 always occurs individually.

On the other hand, if the same agents use an individual FIFO setup, there will be two separate and independent message queues. Let’s consider the arrival of messages in these two queues with the following order:

{m1, m3, m1} and

{m2, m2}

then the sequence of message handler invocations might occur as follows:

  1. A’s m1 on T1, B’s m2 on T2, and B’s m2 on T3;
  2. after A’s m1 completion then A’s m3 on T1;
  3. after A’s m3 completion then A’s m1 on T1.

In other words, when using individual FIFO, handlers can be invoked in parallel if they belong to different agents (as for thread_pool) or if they are in the same agent and marked thread_safe. Clearly, non-thread-safe handlers within the same agent won’t be called in parallel, but they can run concurrently with handlers of other agents, as in the example above.

Semantically speaking, it’s intriguing that within the same agent, utilizing an individual FIFO and a thread-safe handler enables the creation of “contention” on a message box, much like what can be achieved with a message chain as discussed in a previous article. This contention is implicit, as the dispatcher can effectively schedule the execution of the same handler with subsequent messages on different threads.

Bear in mind that changing the agent’s state means also managing subscriptions. In other words, from thread-safe handlers we are not allowed to create nor delete subscriptions. Indeed, SObjectizer performs certain checks in methods like so_change_state() and so_subscribe(), and it throws an exception if they are called within a thread-safe handler.

Splitting work – take #2

This brings us to the heart of this article: is it possible to distribute work among multiple workers by spawning only a single agent bound to adv_thread_pool? Yes but this approach is viable as long as concurrent work doesn’t modify the agent’s state, that work is encapsulated in one or more message handlers, and an individual FIFO is employed.

For example, after modifying the image_resizer, we might add it to the cooperation by using an adv_thread_pool‘s dispatcher:

int main()
{
    const auto ctrl_c = get_ctrlc_token();

    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");
    const auto commands_channel = sobjectizer.environment().create_mbox("commands");
    const auto message_queue = create_mchain(sobjectizer.environment());

    sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer_recursive>(main_channel, commands_channel);
        c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);

        const auto pool = disp::adv_thread_pool::make_dispatcher(env.environment(), 2).binder(disp::adv_thread_pool::bind_params_t{}.fifo(disp::adv_thread_pool::fifo_t::individual))       
        auto resized = c.make_agent_with_binder<image_resizer>(pool, main_channel, 0.5)->output();

        c.make_agent<image_tracer>(resized);
    });

    do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}
Enter fullscreen mode Exit fullscreen mode

To incorporate the agent into the cooperation, we employed make_agent_with_binder, which operates akin to make_agent but with also the possibility to specify an alternative binder instead of using the one inherited from the cooperation. As for thread_pool, the FIFO strategy is specified on the binder. This allows every agent to have its own configuration even if belonging to the same cooperation.

Possibly, splitting image_resizer‘s work doesn’t offer significant advantages. Alternatively, we could consider restructuring image_saver. The primary issue lies in each save worker maintaining a continuous frame counter, which forms part of the agent’s state. Updating this counter makes it thread unsafe. Usually, when capturing images from real-world cameras, the frame counter (e.g. FrameId) is supplied by the device or virtually generated by the producer. It’s not an ideal practice for the image_saver to generate this type of metadata. Yet, extending our “image type” to encompass something more semantically meaningful than cv::Mat might be a topic for a future post. For now, we’ll employ a couple of static counters just to illustrate the concept. Here is an updated image_saver:

class image_saver_one_agent final : public agent_t
{
public:
    image_saver_one_agent(agent_context_t ctx, mbox_t input, std::filesystem::path root_folder)
        : agent_t(std::move(ctx)), m_input(std::move(input)), m_root_folder(std::move(root_folder))
    {

    }

    void so_evt_start() override
    {
        std::error_code ec;
        if (create_directories(m_root_folder, ec); ec)
        {
            throw std::runtime_error(std::format("image_saver can't create root folder: {}", ec.message()));
        }       
    }

    void so_define_agent() override
    {
        so_subscribe(m_input).event([this](const cv::Mat& image) {
            static atomic global_worker_id = 0;
            thread_local const int worker_id = global_worker_id++;
            thread_local int frame_id = 0;
            imwrite((m_root_folder / std::format("image_{}_{}.jpg", worker_id, frame_id++)).string(), image);
        }, thread_safe);
    }

private:    
    mbox_t m_input;
    std::filesystem::path m_root_folder;
};
Enter fullscreen mode Exit fullscreen mode

And here is a possible usage:

int main()
{
    const auto ctrl_c = get_ctrlc_token();

    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");
    const auto commands_channel = sobjectizer.environment().create_mbox("commands");
    const auto message_queue = create_mchain(sobjectizer.environment());

    sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer_recursive>(main_channel, commands_channel);
        c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);

        const auto pool = disp::adv_thread_pool::make_dispatcher(sobjectizer.environment(), 2).binder(disp::adv_thread_pool::bind_params_t{}.fifo(disp::adv_thread_pool::fifo_t::individual));                  
        c.make_agent_with_binder<image_saver_one_agent>(pool, main_channel, "/images/");
    });

    do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}
Enter fullscreen mode Exit fullscreen mode

However, compared to image_saver, image_saver_one_agent has forfeited the capability to smoothly interrupt pending tasks at the shutdown, similar to other agents that buffer tasks within their message queues, as explored in a dedicated article.

In this case, we encapsulated all the work into a single handler but, in general, agents might have several thread-safe and non-thread-safe handlers.

Takeaway

In this episode we have learned:

  • message handlers can be marked as thread-safe by adding thread_safe to the subscription definition;
  • thread-safe handlers should not modify agent’s state or should do it without race conditions;
  • SObjectizer offers only one dispatcher capable of effectively identifying and leveraging thread-safe handlers: adv_thread_pool;
  • using cooperation FIFO on adv_thread_pool brings parallelism only among different agent’s thread-safe handlers;
  • using individual FIFO on adv_thread_pool opens doors to executing handlers belonging to the same agent on different threads;
  • consequently, an alternative approach to splitting work across multiple workers when the operation is stateless consists in using thread-safe handlers on adv_thread_pool configured with an individual FIFO;
  • make_agent_with_binder is utilized to add an agent to a cooperation, overriding the dispatcher binder instead of inheriting the cooperation’s one.

As usual, calico is updated and tagged.

What’s next?

Whether by coordinating multiple agents or by marking handlers as thread safe, tasks that often benefit from being distributed across multiple threads are manifold. For example, those that can be split into smaller pieces executed simultaneously, distributing the load across the available cores. Or systems that need to handle multiple network requests, such as a gRPC service.

However, what consequences arise from distributing work? And in a broader sense, how can we evaluate the performance of our actor-based application?

Following up our discussion initiated over lunch with Helen, the upcoming three posts will explore the realm of performance.


Thanks to Yauheni Akhotnikau for having reviewed this post.

Top comments (0)