DEV Community

Cover image for SObjectizer Tales - 1. Producing Images
Marco Arena
Marco Arena

Posted on • Originally published at marcoarena.wordpress.com

SObjectizer Tales - 1. Producing Images

In the introductory episode, we showed some incomplete code that is time to make work properly.

Spoiler: in this first iteration, our program will be able to do something like this:

Since using any real camera drivers is out of scope, we’ll use OpenCV to open our default camera (e.g. the webcam) and grab frames one by one. In case you don’t have any available camera on your machine, we’ll provide a simple “virtual producer” that replays some images from a directory.

You may wonder why adding a dependency to OpenCV just for that. Well, OpenCV will be helpful not only to manage and manipulate frames, but will be also exploited a bit more in other posts. It’s just what is needed.

Before starting, a few words about the project behind this series: calico (Camera Lite Composer), a console application that we’ll add features to on every post. A program that grabs frames from your webcam, sends them to a pipeline of agents for processing, and shutdowns on SIGTERM/SIGINT (e.g. when receiving CTRL+C from the terminal). The repository will be tagged on every new post, like a new “release”, this way you can take a snapshot of the whole project at some point without searching for specific commits. On the project page you’ll find additional instructions on building the project on your machine. The repository will incrementally contain more agents that you are encouraged to combine as you like. Also, you should definitely make your own agents, and if you feel like sharing them with others, open a pull request, drop a comment here or open an issue. By the way, the project is clearly not intended for production.

Grabbing frames from the webcam

The first thing to do is adapting image_producer to OpenCV:

#include <opencv2/opencv.hpp>
#include <so_5/all.hpp>
#include <stop_token>

class image_producer final : public so_5::agent_t
{
public:
    image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
    {
    }

    void so_evt_start() override
    {
        cv::VideoCapture cap(0, cv::CAP_DSHOW);
        if (!cap.isOpened())
        {
            throw std::runtime_error("Can't connect to the webcam");
        }
        cv::Mat image;
        while (!m_stop.stop_requested())
        {
            cap >> image;
            so_5::send<cv::Mat>(m_channel, std::move(image));
        }
    }

private:
    so_5::mbox_t m_channel;
    std::stop_token m_stop;
};
Enter fullscreen mode Exit fullscreen mode

cv::CAP_DSHOW is not really required but should solve the issues described here and that we experienced when on some machines.

This is a simple implementation of an agent that opens the default camera and sends frames grabbed out of it one by one, until stopped. In case OpenCV can’t open the camera, an exception is thrown…One question here is: what happens if an exception leaves a message handler (remember that so_evt_start is like a message handler)? The rule of thumb is: agents should not throw exceptions. When they do, SObjectizer decides how to continue the program according to a reaction policy. By default, the reaction is simply calling std::abort (after logging a meaningful message). That policy can be set at various levels:

  • per agent
  • per cooperation
  • per environment

If you want to know more about reaction policies, check out this link to the official documentation. Just an example:

sobjectizer.introduce_coop( [](so_5::coop_t& c) {
   c.set_exception_reaction(so_5::deregister_coop_on_exception);
   // ...
} );
Enter fullscreen mode Exit fullscreen mode

This above sets the policy to deregister_coop_on_exception that, unsurprisingly, deregisters the cooperation when an exception escapes from the agent.

For now, we can just stay with the default reaction and abort the program in case of any unhandled exceptions.

Displaying images

The image_viewer agent from the previous post should work already. It just calls OpenCV’s imshow, a function that displays a frame into a window:

class image_viewer final : public so_5::agent_t
{
public:
    image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_channel).event([](cv::Mat image) {
            imshow("Live!", image);
            cv::waitKey(25);
        });
    }

private:
    so_5::mbox_t m_channel;
};
Enter fullscreen mode Exit fullscreen mode

Warning: actually, OpenCV’s UI-related functions (imshow, waitKey, etc) are guaranteed to work only when called from the main thread. As far as we know, this is not an issue on Windows but it can be on other operating systems. We won’t address this problem now but we’ll provide a solution in a future post.

So far, we have used both so_define_agent and so_evt_start. What’s the difference? These are a couple of customization points designed for two different purposes:

  • so_define_agent is for preparing an agent to start. It’s called during the registration of its cooperation (and for this reason on thread where register_coop() is called). This is the right moment to add subscriptions because the agent is not started yet;
  • so_evt_start instead, is the first action that an agent does after starting (and for this reason, as we have seen already, is called on its worker thread – depending on the dispatcher, remember?). Similarly, so_evt_finish is the last action that an agent does before finishing (e.g. before deregistration).

In case you are wondering, an agent can add subscriptions anytime, even outside so_define_agent. However, when possible, we prefer adding them here since it’s more idiomatic and expressive.

Here it’s worth mentioning another important fact: the destructor of a constructed agent will be called always, even if the agent wasn’t registered because of an error. In this case, if the destructor clears some resources that are expected to be created in so_evt_start, unexpected errors can happen. On the other hand, so_evt_finish is only called if so_evt_start completed successfully.

Moreover, sometimes it’s important to call initialization and de-initialization functions from the same thread. so_evt_start and so_evt_finish give such a guarantee if dispatchers one_threadactive_group or active_obj are used.

To make a subscription, we can provide either a function object (e.g. a type providing a call operator – like the lambda here above) or a member function. For example:

class image_viewer final : public so_5::agent_t
{
public:
    image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_channel).event(&image_viewer::handle_image);
    }

private:
    void handle_image(cv::Mat image)
    {
        imshow("Live!", image);
        cv::waitKey(25);
    }

    so_5::mbox_t m_channel;
};
Enter fullscreen mode Exit fullscreen mode

Subscribing multiple times to the same type (const T& and T are considered the same type) causes an exception at run-time. Note that an exception thrown from so_define_agent is not considered an exception escaping from a message handler. It breaks the registration of the cooperation and it’s not handled automatically according to the reaction policy we have seen before. On the other hand, if so_subscribe fails in a message handler then such an exception will be handled as we have learned before.

It’s time to write down our main function:

int main()
{
    const auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer>(main_channel, ctrl_c);
        c.make_agent<image_viewer>(main_channel);       
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

create_mbox("main"); will return a new message box instance because there is no other message box called "main" in the system at this point. In general, this function is used also for retrieving an existing named message box.

You might be wondering about the copy semantics of message boxes. Actually, so_5::mbox_t is similar to shared_ptr which destroys the underneath object once all references to it are gone. Thus, storing copies of mbox_t in agents is considered idiomatic, and copying mbox_t is cheap.

The only missing piece is the stop_token. Although it’s beyond the scope of the series, a decent working solution can be discussed here. Since C++20, a possible solution consists in using a combination of std::signal and waiting atomic operations:

class ctrlc_handler
{
public:
    ctrlc_handler()
     : worker{[this]{ stopped.wait(false); source.request_stop(); }}
    {           
    }

    void raise()
    {
        stopped.test_and_set();
        stopped.notify_one();
    }

    stop_token get_token()
    {
        return source.get_token();
    }
private:
    atomic_flag stopped;
    stop_source source;
    std::jthread worker;
};

static ctrlc_handler ctrlc;

void ctrlc_signal_handler(int signum)
{
    if (signum == SIGINT || signum == SIGTERM)
    {
        ctrlc.raise();
    }
}

stop_token get_ctrlc_token()
{
    [[maybe_unused]]static const auto _installed_SIGINT = std::signal(SIGINT, ctrlc_signal_handler);
    [[maybe_unused]]static const auto _installed_SIGTERM = std::signal(SIGTERM, ctrlc_signal_handler);
    return ctrlc.get_token();
}
Enter fullscreen mode Exit fullscreen mode

Finally, wait_for_stop just waits until the token is stopped. It can be easily implemented with condition_variable_any:

void wait_for_stop(const std::stop_token& st)
{
    std::condition_variable_any cv;
    std::mutex m;
    std::stop_callback stop_wait{ st, [&cv]() { cv.notify_one(); } };
    std::unique_lock l{ m };
    cv.wait(l, [&st] { return st.stop_requested(); });
}
Enter fullscreen mode Exit fullscreen mode

At this point, we should be able to run the program and stop it by pressing CTRL+C! What the result looks like is at the top of this article.

A virtual replayer

In case you don’t have a working camera on your machine, here is a simple virtual device that just replays some images from a directory:

class virtual_image_producer final : public so_5::agent_t
{
public:
    virtual_image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st, std::filesystem::path path)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st)), m_path(std::move(path))
    {
    }

    void so_evt_start() override
    {
        if (!is_directory(m_path))
        {
            throw std::runtime_error("Can't open virtual device directory");
        }
        auto it = std::filesystem::directory_iterator{ m_path };
        while (!m_stop.stop_requested())
        {
            if (it->is_regular_file())
            {
                so_5::send<cv::Mat>(m_channel, cv::imread(it->path().string()));
                std::this_thread::sleep_for(20ms); // we assume 50 FPS
            }
            if (++it == std::filesystem::directory_iterator{})
            {
                it = std::filesystem::directory_iterator{ m_path };
            }
        }
    }
private:
    so_5::mbox_t m_channel;
    std::stop_token m_stop;
    std::filesystem::path m_path;
};
Enter fullscreen mode Exit fullscreen mode

Again, lots of missing details, error handling is poor, and there is room for improvement. However, this is enough for now. Here is how to use this producer (assuming we have some images to replay in c:\images):

int main()
{
    const auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
        c.make_agent<virtual_image_producer>(main_channel, ctrl_c, R"(c:\images)");
        c.make_agent<image_viewer>(main_channel);       
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

A little step forward

The program opens the webcam and sends frames, one by one, to the “main” channel. There is only one subscriber to that channel that simply displays the stream of images onto a window.

Then we might wonder how to add another subscriber to the party. Let’s say we want to print a line on the standard output for every image received. A sort of “image tracer”:

class image_tracer : so_5::agent_t
{
public:
    image_tracer(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_channel).event([](cv::Mat image) {
            std::osyncstream(std::cout) << "Got image: " << image.size << "\n";
        });
    }
private:
    so_5::mbox_t m_channel;
};
Enter fullscreen mode Exit fullscreen mode

Among other things, this is useful to troubleshoot the program when we don’t have a GUI.

Then we can add an agent of this type to the cooperation:

sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
    c.make_agent<image_producer>(main_channel, ctrl_c);
    c.make_agent<image_viewer>(main_channel);
    c.make_agent<image_tracer>(main_channel);
}
Enter fullscreen mode Exit fullscreen mode

And we are done! Remember this agent will be assigned to a new worker thread, since we still use the cooperation’s dispatcher that is active_obj.

This is what the result looks like at this point:

As expected, we get a flow of frames. The actual frame rate depends on your device (mine is 30 fps).

Here is a picture of the current data pipeline:

One advantage of this approach is evident: flexibility. New components are self-contained and easy to integrate. They might be even developed by different teams. Also, they can be tested in isolation (we’ll return to testing later in the series).

Message copy semantics

You might wonder if image_tracer gets a copy of the original message or not. In other words, if image_tracer and image_viewer receive the very same object or two different copies, and if SObjectizer provides other options. Semantically, they both receive the same data. So, anytime image_producer drops an image into main_channel, that image is – let’s say – propagated to both.

Since we are C++ programmers, we are obsessed with avoiding unnecessary copies of data. All the details boil down to a couple of scenarios:

When we so_5::send our own message instance (not wrapped already into SObjectizer’s smart pointers), SObjectizer creates a new instance of the message and stores that into an atomic reference-counted smart pointer. The new message is either copy- or move- constructed, depending on the value category of the source. With “new instance” we just mean new T{...}. For example:

send<Mat>(m_channel, move(image)); // aka: new Mat(move(image));
send<Mat>(m_channel, image); // aka: new Mat(image);
Enter fullscreen mode Exit fullscreen mode

All the arguments passed to send will get forwarded to the constructor of the message type. This means it’s possible to construct messages in-place:

send<Mat>(m_channel, 1024, 768, CV_8UC3, Scalar{1});
Enter fullscreen mode Exit fullscreen mode

Then, dynamic allocation is mandatory to use SObjectizer. This can be an issue or not, depending on the domain.

The other scenario is when we send messages that are already contained into SObjectizer’s smart pointer type. In this regard, since the message is preallocated, SObjectizer does not need to create a new instance and handles that efficiently.

How does it affect message reception?

Consider image_tracer‘s handler:

so_subscribe(m_channel).event([](cv::Mat image) {
    std::osyncstream(std::cout) << "Got image: " << image.size << "\n";
});
Enter fullscreen mode Exit fullscreen mode

The parameter of the lambda is cv::Mat. Does it make an extra copy of the data? The answer should not surprise: yes. The rules of C++ apply as usual. This is copied from the smart pointer SObjectizer creates when sending the message. So, end-to-end, we have one dynamic allocation to send the data and, in this case, an additional copy. By the way, consider that OpenCV’s Mat works like a shared_ptr, therefore that copy might be negligible (image bytes are not copied, in practice). In a future post we’ll discuss about some dodgy implications of this.

So, to prevent that additional copy, we can just receive a const cv::Mat&. In general, SObjectizer support three ways for receiving message references:

(const cv::Mat& image)
(so_5::mhood_t<cv::Mat> image)
(const so_5::mhood_t<cv::Mat>& image)
Enter fullscreen mode Exit fullscreen mode

You are familiar with the first one already. The other two make use of a special type provided by SObjectizer that is none other than a smart reference. This type has a few features that are mandatory in certain scenarios and will be discussed in other articles (for example, it allows to participate in the ownership of the underlying instance). In our case, using either const Mat& or mhood_t is equivalent, so we can just adapt both image_tracer and image_viewer to const Mat&.

Now we have to bargain for message immutability. It’s not an issue we’ll discuss here but it’s just something to mention.

Takeaway

In this episode we have learned:

  • agents should not let exceptions out of their handlers, but in case they do, SObjectizer will react in a configurable way;
  • message boxes are handled like smart pointers, thus it’s ok to copy around mbox_t instances;
  • if an agent subscribes to the same message type multiple times…boom! SObjectizer throws an exception during the registration of its cooperation;
  • subscriptions can be either function objects (e.g. lambdas) or member functions;
  • subscription signatures support both value and reference semantics, and mhood_t is a special reference type that provides reference semantics and also some extra feature we’ll delve into soon;
  • dynamic allocation is mandatory to use SObjectizer.

What’s next?

Although the journey has just started, a new requirement knocks at the door:

A camera vendor just released a new SDK that works with callbacks. Basically, the user sets a function that the SDK calls for every new available frame. How can we integrate this into a producer agent?

This will be turned into a task we’ll work on in the next episode!

Top comments (0)