DEV Community

Cover image for SObjectizer Tales - 5. Sending commands
Marco Arena
Marco Arena

Posted on • Originally published at marcoarena.wordpress.com

SObjectizer Tales - 5. Sending commands

In our previous session, we delved into the process of managing commands to oversee the acquisition. This journey was notably extensive, given the necessity of updating all our producers along the way. The positive takeaway from this endeavor is that the process of sending commands is now significantly simplified.

To illustrate this improvement, we manually initiated start and stop commands from the main function, which, admittedly, lacks excitement. However, in this upcoming post, we will explore various instances of generating commands from different segments within the program.

Our producers are now primed to accept commands via the designated “commands” message box. The responsibility lies with various components of the program to transmit signals through this channel. By sending the start_acquisition_command and stop_acquisition_command, any of our producers will respond as required. This opens up a wide array of potential possibilities. We could develop a keyboard listener that translates keystrokes into signals, or create a REST API capable of receiving commands from the network, among other options.

For example, here is a minimal keyboard listener based on OpenCV that shows a tiny frame where you can press Enter or Escape to start and stop the acquisition:

class remote_control : public so_5::agent_t
{
    struct keep_on final : so_5::signal_t {};
public:
    remote_control(so_5::agent_context_t ctx, so_5::mbox_t commands)
        : agent_t(std::move(ctx)), m_channel(std::move(commands))
    {
    }

    void so_evt_start() override
    {
        cv::Mat frame = cv::Mat::ones(100, 200, CV_8UC3);
        putText(frame, "Start (Enter)", { 2, 20 }, cv::FONT_HERSHEY_COMPLEX_SMALL, 1, { 240, 200, 1 });
        putText(frame, "Stop (Escape)", { 2, 50 }, cv::FONT_HERSHEY_COMPLEX_SMALL, 1, { 240, 200, 1 });
        imshow("Remote Control", frame);        
        so_5::send<keep_on>(*this);
    }

    void so_define_agent() override
    {
        so_subscribe_self()
            .event([this[(so_5::mhood_t<keep_on>) {
            switch (cv::waitKey(1))
            {
            case 13: // Enter
                so_5::send<start_acquisition_command>(m_channel);
                break;
            case 27: // Escape
                so_5::send<stop_acquisition_command>(m_channel);
                break;          
            default:
                break;
            }
            so_5::send_delayed<keep_on>(*this, 100ms);
        });
    }
private:
    so_5::mbox_t m_channel;
};
Enter fullscreen mode Exit fullscreen mode

Technical detail: the frame must be kept responsive by iteratively calling either waitKey or pollKey, for this reason we use that keep_on signal. As said in a previous discussion, the command channel might be retrieved by name but here we prefer to inject it.

As usual, we add that agent to the party as follows:

int main()
{
    auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    auto main_channel = sobjectizer.environment().create_mbox("main");
    auto commands_channel = sobjectizer.environment().create_mbox("commands");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&[(so_5::coop_t& c) {
        c.make_agent<image_producer_recursive>(main_channel, commands_channel);
        c.make_agent<image_viewer>(main_channel);
        c.make_agent<remote_control>(commands_channel);
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

Here is a live demo:

Now, let’s consider adding some excitement to the situation. Imagine we need to capture frames for a specific duration, such as 10 seconds. How can we achieve this? One approach is to initiate the process with a “start” command, wait for 10 seconds, and then issue a “stop” command:

// somewhere
send<start_acquisition_command>(commands);
this_thread::sleep_for(10s);
send<stop_acquisition_command>(commands);
Enter fullscreen mode Exit fullscreen mode

This method is functional, and as you may be aware, the “stop” command will not be executed until at least 10 seconds have passed (potentially even longer due to scheduling and other operating system intricacies). However, this approach can be somewhat inconvenient because it might require relocating the operation to a different context, like a separate thread or handling it within a coroutine, to prevent it from blocking the current thread.

An alternative way is using SObjectizer’s timers.

Introducing timed send functions

Typical applications developed with SObjectizer use timers quite extensively and for this reason SObjectizer uses a dedicated timer thread that can efficiently process a big amount of timers (tens and hundreds of million, even billions of timers).

Timers come in the form of two additional ways of sending messages:

  • delayed messages
  • periodic messages

The former is a manner of postponing the delivery of a message. For example:

so_5::send_delayed<stop_acquisition_command>(commands, 10s);
Enter fullscreen mode Exit fullscreen mode

sends stop_acquisition_command after 10 seconds.

Messages sent by send_delayed can’t be cancelled. On the other hand, messages sent by send_periodic can. We’ll see how in a moment.

As one would anticipate, periodic messages provide a means of transmitting the same message multiple times at specific intervals. In practice, the message instance is not deallocated but, instead, it is delivered again and again, until cancellation happens (either automatic or explicit).

For instance, consider a scenario in which our program is employed for video surveillance, and we require capturing images for a duration of 5 minutes every half an hour. In such a case, we can dispatch a pair of periodic messages:

auto receipt_start = so_5::send_periodic<start_acquisition_command>(commands,
      30m, // first delivery delay
      30m); // repetition period

auto receipt_stop = so_5::send_periodic<stop_acquisition_command>(commands,
      35m, // first delivery delay
      30m); // repetition period
Enter fullscreen mode Exit fullscreen mode

receipt_start and receipt_stop are instances of so_5::timer_id_t, a class that identifies a timed message. Often such instances are simply referred to as timers.

Storing receipt_start and receipt_stop is crucial, indeed timer_id_t works like a shared pointer and destroying the last instance will cause the periodic message to be cancelled. This means, the message won’t be sent again according to the period. In other words, this code:

so_5::send_periodic<stop_acquisition_command>(commands, 35m, 30m);
Enter fullscreen mode Exit fullscreen mode

causes the message to be immediately cancelled!

Also, send_periodic supports delayed messages in case the third parameter (period) is 0. In this special case, the message will be delivered only once, after the specified delay:

auto delayed = so_5::send_periodic<start_acquisition_command>(commands,
      30m, // delivery delay
      0m); // Repetition period is zero -> it's a delayed message
Enter fullscreen mode Exit fullscreen mode

Clearly, in this case destroying the receipt causes the delayed message to be cancelled.

Cancelling a message

As said, periodic messages can be cancelled, whether they are created with a period greater than zero or not. Cancellation happens automatically when the last instance of timer_id_t returned from send_periodic is destroyed. However, we can even cancel a timer explicitly by means of release:

auto receipt = so_5::send_periodic<start_acquisition_command>();
...
receipt.release(); // cancellation
Enter fullscreen mode Exit fullscreen mode

This method cancels the message regardless of the count of remaining timer_id_t objects pointed to that instance.

Often timers are stored into agents as member variables. For example:

class some_agent : public so_5::agent_t 
{
    so_5::timer_id_t m_timer;
    //...
    void some_function() 
    {
        m_timer = so_5::send_periodic<some_message>(...); 
        // ... 
    }
};
Enter fullscreen mode Exit fullscreen mode

In this case, when the agent is destroyed, so is m_timer.

Have you ever tried to cancel a timer on Alexa…while it was about to ring?! In SObjectizer, message cancellation is not strictly guaranteed. Indeed, a delayed message won’t be revoked if it left the timer thread already. This happens if you try cancelling the message too late, in that case there are good chances it will be delivered anyway. For example, if you send a message with delay 100ms and you try cancelling after 100ms then it might have left the timer thread already. There are ways to prevent this problem but won’t be discussed here. If you are interested, have a look at this.

What about virtual_image_producer?

You make a good observation! In the previous post, we inadvertently overlooked upgrading virtual_image_producer to react to commands properly. This occurred because we needed timed messages. Let’s do it now.

The state machine closely resembles that of image_producer_recursive with two key distinctions:

  • the images are retrieved from the file system;
  • grab_image signal is dispatched as a delayed message to replicate the delay between two successive frames.

Here is the full implementation:

class virtual_image_producer final : public so_5::agent_t
{
    struct grab_image final : so_5::signal_t {};
public:
    virtual_image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands, std::filesystem::path path)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands)), m_path(std::move(path))
    {
    }

    void so_define_agent() override
    {
        st_started.event([this[(so_5::mhood_t<grab_image>) {
            cv::Mat mat;
            m_current_it = std::find_if(m_current_it, {}, [[(const auto& e) {
                return e.is_regular_file();
            });
            so_5::send<cv::Mat>(m_channel, cv::imread(m_current_it->path().string()));
            so_5::send_delayed<grab_image>(*this, std::chrono::milliseconds(20));
            if (++m_current_it == std::filesystem::directory_iterator{})
            {
                m_current_it = std::filesystem::directory_iterator{ m_path };
            }
        }).event(m_commands, [this[(so_5::mhood_t<stop_acquisition_command>) {
            st_stopped.activate();
        });

        st_stopped.event(m_commands, [this[(so_5::mhood_t<start_acquisition_command>) {
            st_started.activate();
            so_5::send<grab_image>(*this);
        });

        st_stopped.activate();
    }

    void so_evt_start() override
    {
        if (!is_directory(m_path))
        {
            throw std::runtime_error("Can't open virtual device directory");
        }

        m_current_it = std::filesystem::directory_iterator{ m_path };
    }
private:
    so_5::state_t st_stopped{ this };
    so_5::state_t st_started{ this };

    so_5::mbox_t m_channel;
    so_5::mbox_t m_commands;
    std::filesystem::path m_path;
    std::filesystem::directory_iterator m_current_it;
};
Enter fullscreen mode Exit fullscreen mode

To provide a more accurate adjustment, we should modify the pause duration by subtracting the milliseconds that were “consumed” while processing the last frame, such as the time required to read a frame from the disk:

// ... as before

void so_define_agent() override
{
    st_started.event([this[(so_5::mhood_t<grab_image>) {
        const auto tic = std::chrono::steady_clock::now();
        cv::Mat mat;
        m_current_it = std::find_if(m_current_it, {}, [[(const auto& e) {
            return e.is_regular_file();
        });
        so_5::send<cv::Mat>(m_channel, cv::imread(m_current_it->path().string()));
        const auto elapsed = std::min(20ms, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - tic));
        so_5::send_delayed<grab_image>(*this, 20ms - elapsed);
        if (++m_current_it == std::filesystem::directory_iterator{})
        {
            m_current_it = std::filesystem::directory_iterator{ m_path };
        }
    }). //... as before
Enter fullscreen mode Exit fullscreen mode

While there are more sophisticated strategies available, we can stick with this simplified implementation for the time being. After all, this is merely a mock device!

Takeaway

In this episode we have learned:

  • SObjectizer supports timers through timed send functions;
  • send_delayed delivers a message after a delay, and such a message can’t be revoked;
  • send_periodic delivers a message again and again, until cancelled;
  • cancellation of periodic messages can happen implicitly or explicitly;
  • send_periodic with 0-period is like delayed message but can be cancelled;
  • cancelling a message immediately after the deadline might prevent the revocation.

As usual, calico is updated and tagged.

What’s next?

We can unleash our imagination and craft any command sources. We can also create functions and components to send commands automatically on the basis of time.

However, playing a bit with start and stop we notice a subtle problem in the behavior of image_viewer: it seems that after stopping the acquisition, the window hangs and stays unresponsive. When we start the acquisition again then it reawakens…what’s happening?

In the next post we’ll deal with that issue and provide a solution.


Thanks to Yauheni Akhotnikau for having reviewed this post.

Top comments (0)