DEV Community

Cover image for SObjectizer Tales - 23. Mutable messages
Marco Arena
Marco Arena

Posted on • Originally published at marcoarena.wordpress.com

SObjectizer Tales - 23. Mutable messages

Last time we promised Helen to show a feature of SObjectizer that can prevent the copying of execution_demand_t instances when they are transmitted from push functions of service_time_estimator_dispatcher and later received from the worker thread.

This is the implementation of the dispatcher:

class service_time_estimator_dispatcher : public so_5::event_queue_t, public so_5::disp_binder_t
{
public:
    service_time_estimator_dispatcher(so_5::environment_t& env, so_5::mbox_t output, unsigned messages_count)
     : m_event_queue(create_mchain(env)), m_start_finish_queue(create_mchain(env)), m_output(std::move(output)), m_messages_count(messages_count), m_messages_left(messages_count)
    {    
        m_worker = std::jthread{ [this] {
            const auto thread_id = so_5::query_current_thread_id();

            receive(from(m_event_queue_start_stop).handle_n(1), [thread_id, this](so_5::execution_demand_t d) {
                // ... as before
            });

            receive(from(m_event_queue).handle_all(), [thread_id, this](so_5::execution_demand_t d) {
                // ... as before
            });

            receive(from(m_event_queue_start_stop).handle_n(1), [thread_id, this](so_5::execution_demand_t d) {
                // ... as before
            });
        } };
    }

    void push(so_5::execution_demand_t demand) override 
    {
        so_5::send<so_5::execution_demand_t>(m_event_queue, std::move(demand));
    }

    void push_evt_start(so_5::execution_demand_t demand) override 
    {
        so_5::send<so_5::execution_demand_t>(m_start_finish_queue, std::move(demand));
    }

    void push_evt_finish(so_5::execution_demand_t demand) noexcept override 
    {
        so_5::send<so_5::execution_demand_t>(m_event_queue_start_stop, std::move(demand));
        close_retain_content(so_5::terminate_if_throws, m_event_queue);
    }

    // ... as before
private:
    // ... as before
};
Enter fullscreen mode Exit fullscreen mode

As we have learnt, in SObjectizer messages are exchanged as immutable, since this is a straightforward and secure approach for implementing interaction within a concurrent application:

  • any number of receivers can simultaneously receive a message instance;
  • messages can be redirected to any number of new receivers;
  • messages can be stored for later processing.

However, using immutable messages implies that modifying a message requires copying it and altering the local instance. Consequently, there are scenarios where this approach might be impractical, such as when the message contains non-copyable data, and inefficient, particularly when dealing with large message sizes.

A few workarounds are available, such as encapsulating data within shared_ptr or such wrappers – a strategy akin to the one inherited from OpenCV for avoiding image data copying – or utilizing the mutable type specifier (brrr…). However, both approaches either introduce safety concerns or incur unnecessary overhead.

The real solution consists in using another slick feature of SObjectizer: mutable messages.

Mutable messages

SObjectizer supports explicitly-typed mutable messages for 1:1 exchanges. Specifically, a mutable message can only be sent to Multiple-Producer Single-Consumer (MPSC) message boxes (aka: the direct message box of an agent) or chains. This restriction implies that there can be a maximum of one receiver for the message. Any attempt to send a mutable message to a Multiple-Producer Multiple-Consumer (MPMC) message box will result in a runtime exception. In other words, mutable messages can’t be used with named and anonymous message boxes created using environment_t::create_mbox().

A mutable message is sent by expressing mutability through the wrapper so_5::mutable_msg<Msg>:

send<mutable_msg<int>>(dst, 10);
Enter fullscreen mode Exit fullscreen mode

and it’s received using either so_5::mutable_mhood_t<Msg> or mhood_t<mutable_msg<M>>. The former serves as a shorthand for the latter, which can be convenient in generic code, such as:

template<typename M> // Can be Msg or mutable_msg<Msg>
class example : public so_5::agent_t {
   ...
   void on_message(mhood_t<M> cmd) {
      ...
   }
};
Enter fullscreen mode Exit fullscreen mode

Since, mutability introduces a new type, a mutable message of type M is a distinct type from an immutable message of type M. As a result, agents may have two different handlers for the same message type, each referring to the same messages but with a different mutability property.

As mentioned earlier, a mutable message can only have a single receiver. In essence, mhood_t<mutable_msg<M>> behaves similarly to unique_ptr: it cannot be copied, only moved. Moving a mutable message is equivalent to redirecting it to another destination. Consequently, a mutable message that has been moved from becomes empty, with its pointee being set to nullptr.

Therefore, mutable messages offer a safer alternative compared to any workaround involving immutable messages, as only one receiver can access a mutable message instance at any given time.

At this point, we leverage this understanding to modify the service_time_estimator_dispatcher to accommodate the usage of mutable messages:

class service_time_estimator_dispatcher : public so_5::event_queue_t, public so_5::disp_binder_t
{
public:
    service_time_estimator_dispatcher(so_5::environment_t& env, so_5::mbox_t output, unsigned messages_count)
     : m_event_queue(create_mchain(env)), m_start_finish_queue(create_mchain(env)), m_output(std::move(output)), m_messages_count(messages_count), m_messages_left(messages_count)
    {    
        m_worker = std::jthread{ [this] {
            const auto thread_id = so_5::query_current_thread_id();

            receive(from(m_event_queue_start_stop).handle_n(1), [thread_id, this](mutable_mhood_t<so_5::execution_demand_t> d) {
                d->call_handler(thread_id);
            });

            receive(from(m_event_queue).handle_all(), [thread_id, this](mutable_mhood_t<so_5::execution_demand_t> d) {
                const auto tic = std::chrono::steady_clock::now();
                d->call_handler(thread_id);
                const auto toc = std::chrono::steady_clock::now();
                const auto elapsed = std::chrono::duration<double>(toc - tic).count();
            });

            receive(from(m_event_queue_start_stop).handle_n(1), [thread_id, this](mutable_mhood_t<so_5::execution_demand_t> d) {
                // ... as before
            });
        } };
    }

    void push(so_5::execution_demand_t demand) override 
    {
        so_5::send<so_5::mutable_msg<so_5::execution_demand_t>>(m_event_queue, std::move(demand));
    }

    void push_evt_start(so_5::execution_demand_t demand) override 
    {
        so_5::send<so_5::mutable_msg<so_5::execution_demand_t>>(m_start_finish_queue, std::move(demand));
    }

    void push_evt_finish(so_5::execution_demand_t demand) noexcept override 
    {
        so_5::send<so_5::mutable_msg<so_5::execution_demand_t>>(m_event_queue_start_stop, std::move(demand));
        close_retain_content(so_5::terminate_if_throws, m_event_queue);
    }

    // ... as before
};
Enter fullscreen mode Exit fullscreen mode

As observed, the modifications are minimal and primarily affect the utilization of the message in using the arrow operator. This adjustment would have been seamlessly incorporated if we had employed mhood_t from the beginning.

A classic pattern

An interesting use case of mutable messages arises when a message requires processing through a pipeline either statically or dynamically-defined. In essence, we define a sequence of processing steps (handled by different agents) where each step receives the message, executes its functionality, and then sends the (modified) message to the subsequent step. This sequential execution enables a message to be processed by multiple services in order, without requiring a coordinating component.

When thinking about calico, we already offer this flexibility as agents can be combined as required through channels, whether named or not. However, this operates with immutable messages, and modifying the message at each step necessitates data copying and dynamic memory allocation for sending it, as seen, for example, in image_resizer and face_detector.

Considering that this approach may be suitable for numerous use cases, we might encounter “hot paths” in our applications where copying data is undesirable, and minimizing dynamic allocations is crucial. Here is where mutable messages come into play.

However, even when utilizing mutable messages, there’s another intriguing aspect to consider: the efficient pipeline’s outcome may need to be delivered to other agents that only necessitate read access, without any further modifications. Essentially, the mutable result should be turned into an immutable message and routed through other components. Is it possible to achieve this without requiring additional copies and dynamic allocations?

Yes, mutable messages can be turned into immutable messages efficiently (without allocating memory nor copying) through the free function to_immutable():

void last_pipeline_step(mutable_mhood_t<some_message> cmd) 
{    
    // ...
    // Now the mutable message will be resend as an immutable one.
    so_5::send(some_channel, so_5::to_immutable(std::move(cmd)));
    // cmd is a nullptr now and can't be used anymore.
}
Enter fullscreen mode Exit fullscreen mode

It’s worth noting that, from SObjectizer’s standpoint, converting a mutable message into an immutable one essentially entails removing a sort of “mutability flag”. Consequently, the memory block of the message itself remains unchanged, ensuring efficient handling.

After removing mutability, the old message wrapper contains nullptr. In case you are wondering, an immutable message can’t be converted into a mutable one without copying the message itself into a newly-allocated one. Indeed, from a design point of view, it would be impossible to transform a shared reference into a singular reference (just like it’s not possible to convert a shared_ptr into a unique_ptr).

In the following example, we show a statically-defined pipeline that leverages mutable messages to avoid intermediate copies. This might be seen as an application of the Pipes and Filters pattern since the pipeline is statically defined. This means that each agent has a fixed “next” step (assembled during the initialization) that can’t be changed at runtime:

class step_1 : public so_5::agent_t
{
public:
    step_1(so_5::agent_context_t ctx, so_5::mbox_t step_2_dst)
        : agent_t(ctx), m_step_2_dst(std::move(step_2_dst))
    {

    }

    void so_define_agent() override
    {
        so_subscribe(so_environment().create_mbox("main")).event([this](const cv::Mat& img) {
            auto local_image = img.clone();
            resize(local_image, local_image, {}, 0.5, 0.5);
            so_5::send<so_5::mutable_msg<cv::Mat>>(m_step_2_dst, std::move(local_image));
        });
    }
private:
    so_5::mbox_t m_step_2_dst;
};

class step_2 : public so_5::agent_t
{
public:
    step_2(so_5::agent_context_t ctx, so_5::mbox_t step_3_dst)
        : agent_t(ctx), m_step_3_dst(std::move(step_3_dst))
    {

    }

    void so_define_agent() override
    {
        so_subscribe_self().event([this](so_5::mutable_mhood_t<cv::Mat> img) {
            line(*img, { img->cols/2, 0 }, { img->cols/2, img->rows }, { 0, 0, 0 }, 3);
            line(*img, { 0, img->rows/2 }, { img->cols, img->rows/2 }, { 0, 0, 0 }, 3);
            so_5::send(m_step_3_dst, std::move(img));
        });
    }
private:
    so_5::mbox_t m_step_3_dst;
};

class step_3 : public so_5::agent_t
{
public:
    step_3(so_5::agent_context_t ctx)
        : agent_t(ctx)
    {

    }

    void so_define_agent() override
    {
        so_subscribe_self().event([this](so_5::mutable_mhood_t<cv::Mat> img) {
            cvtColor(*img, *img, cv::COLOR_BGR2GRAY);
            so_5::send(so_environment().create_mbox("output"), to_immutable(std::move(img)));
        });
    }
};

int main()
{
    const auto ctrl_c = utils::get_ctrlc_token();

    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");
    const auto commands_channel = sobjectizer.environment().create_mbox("commands");
    const auto message_queue = create_mchain(sobjectizer.environment());

    // here is the setup of the pipeline
    sobjectizer.environment().introduce_coop(so_5::disp::active_group::make_dispatcher(sobjectizer.environment()).binder("pipeline"), [&](so_5::coop_t& c) {
        auto step_3_dst = c.make_agent<step_3>()->so_direct_mbox();  // this sends data to "output" channel
        auto step_2_dst = c.make_agent<step_2>(step_3_dst)->so_direct_mbox();
        c.make_agent<step_1>(step_2_dst); // this gets data from "main" channel
    });

    sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
        c.make_agent<producers::image_producer_recursive>(main_channel, commands_channel); // this sends data to "main" channel
        c.make_agent<agents::maint_gui::remote_control>(commands_channel, message_queue);
        c.make_agent<agents::maint_gui::image_viewer>(sobjectizer.environment().create_mbox("output"), message_queue);
    });

    do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));     
}
Enter fullscreen mode Exit fullscreen mode

We grouped the pipeline’s agents into a dedicated cooperation using an active_group dispatcher (aka: they all share the same worker thread for processing).

The situation becomes a bit more complex when the pipeline needs to be dynamically assembled, such as in the routing slip pattern. In essence, the processing steps are not predetermined but can change dynamically. In this scenario, a potential implementation involves integrating the “routing” as part of the message itself. This means, each processing step performs its work and then passes the modified message to the “next” step that is obtained from the message. In this scenario, utilizing mutable messages serves two purposes: not only does it prevent the need to copy the processed instance every time (as before), but it also facilitates “stepping” the same routing object along the pipeline (to obtain the next destination).

However, combining the payload instance (e.g. the image) with the routing object doesn’t eliminate the need for a final dynamic allocation when the payload needs to be converted into an immutable message and passed through the rest of the application. While this might not be a significant concern, it’s worth noting. The reason is simple:

struct slip_message
{
    cv::Mat image;
    routing routing;
};

void last_step(mutable_mhood_t<slip_message> msg)
{
    send<cv::Mat>(dest, std::move(msg->image));
}
Enter fullscreen mode Exit fullscreen mode

Even though copying the data can be avoided by moving, we can’t eliminate the dynamic allocation that occurs when using the send function in this context.

Anyway, let’s see a possible implementation of this pattern. First of all, we define the message type, as discussed before:

class route_slip
{
public: 
    route_slip(std::vector<so_5::mbox_t> routes)
        : m_routes(std::move(routes))
    {
    }

    const so_5::mbox_t& next()
    {
        return m_routes[m_current++];
    }
private:
    std::vector<so_5::mbox_t> m_routes;
    size_t m_current = 0;
};

template<typename T>
struct route_slip_message
{
    const so_5::mbox_t& next()
    {
        return slip.next();
    }

    route_slip slip;
    T payload;
};

template<typename T>
void send_to_next_step(so_5::mutable_mhood_t<route_slip_message<T>> msg)
{
    const auto& next_step = msg->next();
    so_5::send(next_step, std::move(msg));
}
Enter fullscreen mode Exit fullscreen mode

In essence:

  • route_slip_message is the type that will circulate through the pipeline, consisting of the payload (e.g. the image) and the routing object;
  • route_slip is that “routing object” implemented as a vector of channels equipped with a simple next function that is used to obtain the next channel of the pipeline. In other words, this defines the sequence of steps;
  • send_to_next_step is a function that simply sends the message to the next channel.

Certainly, there could be other implementations, but the objective here is to keep things simple enough to grasp the concept.

Then, we introduce some “processing steps”. Every step is an agent that handles a mutable route_slip_message by processing it and then sending it to the next step:

class resize_step final : public so_5::agent_t
{
public:
    resize_step(so_5::agent_context_t ctx)
        : agent_t(ctx)
    {
    }

    void so_define_agent() override
    {
        so_subscribe_self().event([](so_5::mutable_mhood_t<route_slip_message<cv::Mat>> msg) {
            resize(msg->payload, msg->payload, {}, 0.5, 0.5);
            send_to_next_step(std::move(msg));
        });
    }
};

class add_crosshairs_step final : public so_5::agent_t
{
public:
    add_crosshairs_step(so_5::agent_context_t ctx)
        : agent_t(ctx)
    {
    }

    void so_define_agent() override
    {
        so_subscribe_self().event([](so_5::mutable_mhood_t<route_slip_message<cv::Mat>> msg) {
            auto& img = msg->payload;
            line(img, { img.cols / 2, 0 }, { img.cols / 2, img.rows }, { 0, 0, 0 }, 3);
            line(img, { 0, img.rows / 2 }, { img.cols, img.rows / 2 }, { 0, 0, 0 }, 3);
            send_to_next_step(std::move(msg));
        });
    }
};

class to_grayscale_step final : public so_5::agent_t
{
public:
    to_grayscale_step(so_5::agent_context_t ctx)
        : agent_t(ctx)
    {
    }

    void so_define_agent() override
    {
        so_subscribe_self().event([](so_5::mutable_mhood_t<route_slip_message<cv::Mat>> msg) {
            cvtColor(msg->payload, msg->payload, cv::COLOR_BGR2GRAY);
            send_to_next_step(std::move(msg));
        });
    }
};
Enter fullscreen mode Exit fullscreen mode

Also, we add the “last step” that is slightly different: it will just extract and send the cv::Mat to a separate destination channel. While it could be part of the same route_slip instance, for clarity, we prefer having a separate field:

template<typename T>
class slip_last_step : public so_5::agent_t
{
public:
    slip_last_step(so_5::agent_context_t ctx, so_5::mbox_t destination)
        : agent_t(ctx), m_destination(std::move(destination))
    {

    }

    void so_define_agent() override
    {
        so_subscribe_self().event([this](so_5::mutable_mhood_t<route_slip_message<T>> msg) {
            so_5::send<T>(m_destination, std::move(msg->payload));
        });
    }
private:
    so_5::mbox_t m_destination;
};
Enter fullscreen mode Exit fullscreen mode

As said before, the dynamic allocation here can’t be avoided.

Lastly, we need the “router” which dynamically assembles the pipeline. To maintain simplicity, we also assign it the responsibility of creating the cooperation of agents. We’ll store their direct message boxes in a map, allowing us to retrieve them by name, simulating a configuration from a file or similar source:

class slip_router : public so_5::agent_t
{
public:
    slip_router(so_5::agent_context_t ctx, so_5::mbox_t source, so_5::mbox_t last)
        : agent_t(ctx), m_source(std::move(source)), m_last(std::move(last))
    {

    }

    void so_evt_start() override
    {
        so_environment().introduce_coop(so_5::disp::active_group::make_dispatcher(so_environment()).binder("slip"), [this](so_5::coop_t& c) {
            m_available_steps["resize"] = c.make_agent<resize_step>()->so_direct_mbox();
            m_available_steps["add_crosshairs"] = c.make_agent<add_crosshairs_step>()->so_direct_mbox();
            m_available_steps["to_grayscale"] = c.make_agent<to_grayscale_step>()->so_direct_mbox();
            m_available_steps["last"] = c.make_agent<slip_last_step<cv::Mat>>(m_last)->so_direct_mbox();    
        });     
    }

    void so_define_agent() override
    {
        so_subscribe(m_source).event([this](const cv::Mat& img) {
            auto local_image = img.clone();         

            // imagine this is created dynamically and only when something changes...
            route_slip_message slip{ {{
                    m_available_steps.at("add_crosshairs"), 
                    m_available_steps.at("to_grayscale"), 
                    m_available_steps.at("last")}}, // we mustn't forget this one!
                std::move(local_image) };

            const auto first_step_channel = slip.next();
            so_5::send<so_5::mutable_msg<route_slip_message<cv::Mat>>>(first_step_channel, std::move(slip));
        });     
    }
private:
    so_5::mbox_t m_source;
    so_5::mbox_t m_last;
    std::map<std::string, so_5::mbox_t> m_available_steps;      
};
Enter fullscreen mode Exit fullscreen mode

In this example, the pipeline is generated for each image. However, we could potentially trigger the creation of a new pipeline based on a specific “configuration” change or a command. Things can become more sophisticated as needed, but this is the general pattern: the routing is embedded within the message, eliminating the necessity for central coordination.

Here is an example of usage:

int main()
{
    const auto ctrl_c = utils::get_ctrlc_token();

    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");
    const auto commands_channel = sobjectizer.environment().create_mbox("commands");
    const auto message_queue = create_mchain(sobjectizer.environment());

    sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer_recursive>(main_channel, commands_channel); // this sends data to "main" channel
        c.make_agent<remote_control>(commands_channel, message_queue);
        c.make_agent<image_viewer>(sobjectizer.environment().create_mbox("output"), message_queue);
        c.make_agent<slip_router>(main_channel, sobjectizer.environment().create_mbox("output"));
    });

    do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));     
}
Enter fullscreen mode Exit fullscreen mode

As you see, the user does not see the pipeline agents that are, instead, created by the slip_router.

Consider that our agents might be easily generalized into a customizable unified agent, for example:

template<typename Action>
class slip_generic_step : public so_5::agent_t, public Action
{
public:
    slip_generic_step(so_5::agent_context_t ctx, Action action)
        : agent_t(ctx), Action(std::move(action))
    {
    }

    void so_define_agent() override
    {
        so_subscribe_self().event([this](so_5::mutable_mhood_t<route_slip_message<cv::Mat>> msg) {
            this->operator()(msg);
            send_to_next_step(std::move(msg));
        });
    }
};

auto make_generic_step(so_5::coop_t& c, auto lambda)
{
    return c.make_agent<slip_generic_step<decltype(lambda)>>(std::move(lambda))->so_direct_mbox();
}

class slip_router : public so_5::agent_t
{
public:
    // ... as before

    void so_evt_start() override
    {
        so_environment().introduce_coop(so_5::disp::active_group::make_dispatcher(so_environment()).binder("slip"), [this](so_5::coop_t& c) {           
            m_available_steps["resize"] = make_generic_step<cv::Mat>(c, [](auto& msg) {
                resize(msg->payload, msg->payload, {}, 0.5, 0.5);
            });
            m_available_steps["add_crosshairs"] = make_generic_step<cv::Mat>(c, [](auto& msg) {
                auto& img = msg->payload;
                line(img, { img.cols / 2, 0 }, { img.cols / 2, img.rows }, { 0, 0, 0 }, 3);
                line(img, { 0, img.rows / 2 }, { img.cols, img.rows / 2 }, { 0, 0, 0 }, 3);
            });
            m_available_steps["to_grayscale"] = make_generic_step<cv::Mat>(c, [](auto& msg) {
                cvtColor(msg->payload, msg->payload, cv::COLOR_BGR2GRAY);
            });

            m_available_steps["last"] = c.make_agent<slip_last_step>(m_last)->so_direct_mbox();
        });     
    }

    // ... as before
};
Enter fullscreen mode Exit fullscreen mode

As mentioned above, such patterns are useful not only in scenarios where flexibility is required, but also in others where copying data is not desirable.

Mutable messages and timed send functions

The last thing to know about mutable messages regards timed send functions: since mutable messages cannot be copied but only moved, they cannot be sent periodically using the send_periodic function if the period is nonzero; otherwise, an exception will be thrown. However, they can be sent using the send_delayed function:

// ok:
auto timer = so_5::send_periodic<so_5::mutable_msg<some_message>>(
    dest_mbox,
    200ms, // Delay before message appearance.
    0ms, // Period is zero.
    ...);

// ko: an exception will be thrown
auto timer = so_5::send_periodic<so_5::mutable_msg<some_message>>(
    dest_mbox,
    200ms, // Delay before message appearance.
    150ms, // Period is not zero.
    ...);

// ok:
so_5::send_delayed<so_5::mutable_msg<some_message>>(
    dest_mbox,
    200ms, // Delay before message appearance.
    ... // some_message's constructor args.
    );
Enter fullscreen mode Exit fullscreen mode

Additionally, it’s just worth mentioning that signals can’t be mutable. It wouldn’t make any sense.

Takeaway

In this episode we have learned:

  • SObjectizer supports explicitly-typed mutable messages for 1:1 exchanges, meaning that an agent receiving a mutable message can safely modify it;
  • mutable messages are sent by declaring the type as mutable_msg<Msg>;
  • mutable messages are received using either mutable_mhood_t<Msg> or mhood_t<mutable_msg<M>>;
  • mutable messages can’t be copied, only moved;
  • mutable messages have only one receiver, this means that mutable messages can’t be used with named and anonymous message boxes created using environment_t::create_mbox() functions, because they are MPMC (doing that would result in an exception);
  • to_immutable(msg) is used to turn a mutable message into an immutable instance, avoiding creating a new message instance or copying data;
  • mutable messages can’t be sent periodically.

As usual, calico is updated and tagged.

What’s next?

We have just met some colleagues gossiping about the sprint planning next week. It seems our product manager will unveil some exciting news about a potential opportunity in a next-generation project at the company.

However, they anticipated we’ll work on an issue related to delivering messages at shutdown…


Thanks to Yauheni Akhotnikau for having reviewed this post.

Top comments (0)