DEV Community

Cover image for SObjectizer Tales - 7. Agent composition
Marco Arena
Marco Arena

Posted on • Updated on • Originally published at marcoarena.wordpress.com

SObjectizer Tales - 7. Agent composition

We received a request from our boss, who will be showcasing the program at a significant customer meeting. She strongly believes that the key to selling this product lies in its flexibility. Consequently, she requires additional agents to allow customers to interact with the system.

In this segment, we will introduce some new agents and delve deeper into the concept of composition.

Spoiler alert: read through the conclusion to see the demo…

What is composition?

In basic terms, an agent has the capability to receive data from single or multiple input channels and, if needed, can transmit data to single or multiple output channels. Composition is the ability to acquire functionalities by combining agents together using channels.

Consider this example:

The channel “raw images” ties the “producer agent” with the “visualizer agent”.

Suppose that we want to display smaller images by resizing them before. Then we introduce a new agent that takes an image from the input channel, performs resize, and finally outputs the resulting image into another channel. The new agent might be added as follows:

Assuming the same image type is involved (e.g. cv::Mat), accommodating that change didn’t required to modify the producer nor the visualizer, but only to reorganize the agents. Indeed, agents are weakly coupled together: they depend only on the type of messages rather than senders and receivers. The “pipeline” can even be generated at run-time, from a configuration file or whatever. The advantage is quite evident.

The resizer agent is a good candidate for the purpose of this article:

class image_resizer final : public so_5::agent_t
{
public:
    image_resizer(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output, double factor)
        : agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output)), m_factor(factor)
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_input).event([this](const cv::Mat& image) {
            cv::Mat resized;
            resize(image, resized, {}, m_factor, m_factor);
            so_5::send<cv::Mat>(m_output, std::move(resized));
        });
    }

private:
    so_5::mbox_t m_input;
    so_5::mbox_t m_output;
    double m_factor;
};
Enter fullscreen mode Exit fullscreen mode

OpenCV’s resize comes to the rescue here:

  • resized contains the resulting image;
  • passing size = {} is a trick for letting OpenCV calculate the final size by applying scale factor (passed in construction).

You can might be tempted to make the code shorter by rewriting the message handler as follows:

so_subscribe(m_input).event([this](cv::Mat image) {
    resize(image, image, {}, m_factor, m_factor);
    so_5::send<cv::Mat>(m_output, std::move(image));
});
Enter fullscreen mode Exit fullscreen mode

However, you would fall into a hidden trap. Indeed, under the hood cv::Mat is reference counted and does not perform a deep copy by default. This means that changing a local instance of cv::Mat would instead tread on someone else’s toes! In particular, all subscribers to the main channel would be affected in an undefined way, resulting in undefined behavior. A way to mitigate this problem won’t be discussed here but consists in wrapping cv::Mat into a “smarter” type that makes image modifications more explicit and acknowledged.

At this point, image_resizer can be added to the cooperation:

int main()
{
    auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    auto main_channel = sobjectizer.environment().create_mbox("main");
    auto commands_channel = sobjectizer.environment().create_mbox("commands");
    auto resized_images = sobjectizer.environment().create_mbox("resized");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer>(main_channel, commands_channel);
        c.make_agent<image_resizer>(main_channel, resized_images, 0.5);
        c.make_agent<image_viewer_live>(resized_images);
        c.make_agent<remote_control>(commands_channel, ctrl_c);     
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

resized_images is a named message box globally available and, consequently, can be subscribed by any agents inside the program (technically speaking, by any holding a reference to the wrapped_env_t that owns that message box). This feature is possibly more useful when several agents need to subscribe to that channel or when we simply keep the door open for new subscriptions. On the other hand, in case only one subscriber is interested in that output (and will always be), using either a named or an anonymous message box is just a matter of taste or style.

However, if we use several image_resizers (e.g. one resizes for mobile and another for desktop) then each of them needs its own output channel. This merits attention when named message boxes are involved because it requires finding reasonable (and unique) names:

Anyway, “output agents” might be equipped with another constructor that automatically creates an anonymous output message box and a member function output() :

Thus, image_resizer might be changed as follows:

class image_resizer final : public so_5::agent_t
{
public:
    image_resizer(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output , double factor)
        : agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output)), m_factor(factor)
    {
    }

    image_resizer(so_5::agent_context_t ctx, so_5::mbox_t input, double factor)
        : image_resizer(ctx, std::move(input), ctx.environment().create_mbox(), factor)
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_input).event([this](const cv::Mat& image) {
            cv::Mat resized;
            resize(image, resized, {}, m_factor, m_factor);
            so_5::send<cv::Mat>(m_output, std::move(resized));
        });
    }

    so_5::mbox_t output() const
    {
        return m_output;
    }

private:
    so_5::mbox_t m_input;
    so_5::mbox_t m_output;
    double m_factor;
};
Enter fullscreen mode Exit fullscreen mode

This allows creating the cooperation more succinctly:

auto resized_images = c.make_agent(main_channel, 0.5)->output();
c.make_agent(resized_images);

Beware we can’t just return image_resizer‘s message box (so_direct_mbox()) because it can’t be subscribed by any other agents but itself. For this reason, we create an anonymous message box (so_environment().create_mbox()).

Forwarding data

Another interesting pattern comes into play when we just need to forward some data from one channel to another. Consider stream_detector we introduced in the last episode and suppose we need its output to be sent to an additional channel. If we can’t change stream_detector, the only approach is introducing a proxy agent that subscribes to the output of stream_detector and redirects data to the new channel. In general, not only writing such proxies is a boring and error-prone task, but also a bit inefficient because these require to run in the context of some dispatcher. However, this was the only way to deal with this until SObjectizer 5.8.

Recently, SObjectizer introduced a new abstraction that solves this problem elegantly and open doors to new forms of message distribution: message sinks.

A message sink represents the destination of a message. Behind that can be entities like agents, message boxes or even custom types. Details are not important and can be a bit convoluted at this point, but we can exploit message sinks to create redirections. For example, semantically speaking, message sinks enable a message box to subscribe to another message box.

Consider this situation:

auto main_channel = sobjectizer.environment().create_mbox("main");
auto stream_notifications = sobjectizer.environment().create_mbox("stream");

auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
    c.make_agent<stream_detector>(main_channel, stream_notifications);
    //...
}
Enter fullscreen mode Exit fullscreen mode

Suppose we need stream_detector::stream_up to be forwarded from stream_notifications to main_channel.

SObjectizer provides a couple of helpers to create forwardings from a message box to an arbitrary message sink. The concept of forwarding here is called binding. A binding can be seen as a tuple (message type, message source, message destination).

SObjectizer provides single_sink_binding_t that can host only one binding and multi_sink_binding_t that can host any number of bindings.

Both provide the function bind to add a binding. For example:

so_5::single_sink_binding_t binding;
binding.bind<stream_detector::stream_up>(stream_notifications, some_destination);
Enter fullscreen mode Exit fullscreen mode

This means: forward to some_destination any message of type stream_detector::stream_up (in this case, it's a signal) delivered to stream_notification.

You might wonder if some_destination can be the main_channel. Well, there is a technical detail here: a message box is not a message sink but it can be turned into it by using a core function: wrap_to_msink(message_box). Thus, the example can be completed as follows:

so_5::single_sink_binding_t binding;
binding.bind<stream_detector::stream_up>(stream_notifications, so_5::wrap_to_msink(main_channel));
Enter fullscreen mode Exit fullscreen mode

At this point, any stream_up sent to stream_notifications is automatically propagated to main_channel as well. Semantically, we have attained a message box (main_channel) subscribing to another message box (stream_notifications). That was not possible before SObjectizer 5.8.

Note that adding an existing subscription would replace the old one:

so_5::single_sink_binding_t binding;
// 1
binding.bind<stream_detector::stream_up>(stream_notifications, so_5::wrap_to_msink(main_channel));
// this replaces "1"
binding.bind<stream_detector::stream_down>(stream_notifications, so_5::wrap_to_msink(main_channel));
so_5::wrap_to_msink(another_channel));
Enter fullscreen mode Exit fullscreen mode

On the other hand, multi_sink_binding_t is a bit more sophisticated (even from an implementation point of view) as it supports multiple bindings:

so_5::multi_sink_binding_t binding;
binding.bind<stream_detector::stream_up>(stream_notifications, so_5::wrap_to_msink(main_channel));
binding.bind<stream_detector::stream_down>(stream_notifications, so_5::wrap_to_msink(main_channel));
binding.bind<stream_detector::stream_down>(stream_notifications, so_5::wrap_to_msink(another_channel));
Enter fullscreen mode Exit fullscreen mode

Here, any stream_down sent to stream_notifications is automatically propagated to both main_channel and another_channel as well. stream_up only to main_channel.

The final important bit about bindings regards lifetime. Both single_sink_binding_t and multi_sink_binding_t drop all subscriptions in the destructor. Thus, it’s crucial to keep binding instances alive as long as we need them.

In addition to usual ways like making the binding object part of an agent, another approach takes advantage of a tiny feature of cooperations:

sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
    c.make_agent<stream_detector>(main_channel, stream_notifications);
    //...
    auto binding = c.take_under_control(std::make_unique<multi_sink_binding_t>());
    binding->bind<stream_detector::stream_up>(stream_notifications, wrap_to_msink(main_channel));
    binding->bind<stream_detector::stream_down>(stream_notifications, wrap_to_msink(main_channel)); 
}
Enter fullscreen mode Exit fullscreen mode

take_under_control transfers ownership of any user resource in the form of unique_ptr<T> (not necessarily part of SObjectizer) to the cooperation.

In a future post we’ll discuss an interesting use case where message sinks are essential.

An extra agent

For experimenting with composition, in addition to image_resizer, we incorporate a fancier agent such as a basic face detector using OpenCV’s Cascade Classifier, which employs a machine learning approach to identify objects within an image. Specifically, we’ll utilize a classifier file (haarcascade_frontalface_default.xml) optimized for recognizing human faces. To streamline our process, this file has been added to the repository, and will be copied into the output build directory for accessibility during runtime:

class face_detector final : public so_5::agent_t
{
public:
    face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
        : agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_input).event([this](const cv::Mat& src) {
            cv::Mat gray;
            cvtColor(src, gray, cv::COLOR_BGR2GRAY);
            std::vector<cv::Rect> faces;
            m_classifier.detectMultiScale(gray, faces, 1.1, 4);
            auto cloned = src.clone();
            for (const auto& [x, y, w, h] : faces)
            {
                rectangle(cloned, { x, y }, { x + w, y + h }, { 255, 0, 0 }, 2);
            }
            so_5::send<cv::Mat>(m_output, std::move(cloned));
        });
    }

    void so_evt_start() override
    {
        if (!m_classifier.load("haarcascade_frontalface_default.xml"))
        {
            throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
        }
    }   
private:
    so_5::mbox_t m_input; so_5::mbox_t m_output;
    cv::CascadeClassifier m_classifier;
};
Enter fullscreen mode Exit fullscreen mode

This new agent has the ability to overlay a rectangle onto each identified face within the frames. It’s important to highlight the utilization of so_evt_start() for loading the classifier file, which might be a time-consuming operation, executed on the agent’s worker thread. If this operation encounters failure, it will throw an exception that will be managed according to the cooperation’s reaction policies discussed in a previous episode.

When employing machine learning (and deep learning) utilizing smaller images can offer advantages such as quicker computation and reduced resource utilization. Although in this simple scenario it might not be essential, let’s combine our image_resizer with the face_detector to emulate the case:

That is:

int main()
{
    auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    auto main_channel = sobjectizer.environment().create_mbox("main");
    auto commands_channel = sobjectizer.environment().create_mbox("commands");
    auto decorated_with_faces = sobjectizer.environment().create_mbox("faces");
    auto resized_images = sobjectizer.environment().create_mbox("resized");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer>(main_channel, commands_channel);
        c.make_agent<image_resizer>(main_channel, resized_images, 0.5);
        c.make_agent<edge_detector>(resized_images, decorated_with_faces);
        c.make_agent<image_viewer_live>(decorated_with_faces);
        c.make_agent<remote_control>(commands_channel, ctrl_c);     
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

As done before for the image_resizer, we might equip face_detector with another constructor that automatically creates an anonymous output message box and with a member function output(). On the repository, you will find this version used as an example.

Finally, the live demo. As expected, the live viewer appears smaller than usual due to the input image being resized beforehand:

What about your own agents?!

By now, you’ve likely discovered that incorporating your own agent is quite straightforward! We encourage you to conduct experiments and create combinations of both existing and new agents according to your preferences.

Would you be interested in sharing some of these outcomes with the community? Please feel free to open a pull request or create an issue on the repository!

Fixing image viewer duplicated title

At this point, it can make sense to use multiple image viewers to experiment with the system. However, since the window title is statically defined, using many instances of the same image viewer class is problematic. For this reason, we make a small change to generate unique window titles:

class image_viewer final : public so_5::agent_t
{
    struct call_waitkey final : so_5::signal_t {};
    so_5::state_t st_handling_images{ this };
    so_5::state_t st_stream_down{ this };   
public:
    image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        st_handling_images
            .event(m_channel, [this](so_5::mhood_t<cv::Mat> image) {
                imshow(m_title, *image);
                cv::waitKey(25);
                st_handling_images.time_limit(500ms, st_stream_down);
            });

        st_stream_down
            .on_enter([this] {
                so_5::send<call_waitkey>(*this);
            })
            .event([this](so_5::mhood_t<call_waitkey>) {
                cv::waitKey(25);
                so_5::send<call_waitkey>(*this);
            }).transfer_to_state<cv::Mat>(m_channel, st_handling_images);

        st_handling_images.activate();
    }
private:
    static inline int global_id = 0;
    so_5::mbox_t m_channel;
    std::string m_title = std::format("image viewer {}", global_id++);
};
Enter fullscreen mode Exit fullscreen mode

Takeaway

In this episode we have learned:

  • either named and anonymous message boxes enable agent composition;
  • message sinks are a new feature of SObjectizer that enables to redirect data succinctly from any message source to one or many others (e.g. from a message box to another message box);
  • cooperations provide a generic mechanism to take ownership of user data in the form of unique_ptr<T>.

What’s next?

Our manager left a positive impact, sparking the customer’s interest in testing the alpha version of the program. Nevertheless, she expresses concern about potential webcam errors and insists that we address them before releasing the program.

Prepare for our upcoming post, where we’ll take on errors directly!


Thanks to Yauheni Akhotnikau for having reviewed this post.

Top comments (0)