DEV Community

Cover image for SObjectizer Tales – 6. Is the stream still in progress?
Marco Arena
Marco Arena

Posted on • Originally published at marcoarena.wordpress.com

SObjectizer Tales – 6. Is the stream still in progress?

In the previous episode, we covered the process of sending commands and utilizing timers. Nevertheless, we encountered an issue that impacts our image viewer. Specifically, when the acquisition process is halted, the image window becomes unresponsive. However, once the acquisition resumes, it regains responsiveness and functions smoothly.

Can you reproduce the issue? What’s going on?

The behavior depends on how we handle OpenCV’s window. After displaying an image with imshow it is our responsibility to call waitKey to enable GUI-related tasks like responding to mouse and keyboard interactions. This process is performed for each frame by image_viewer. However, when the image stream pauses, the opportunity to call waitKey also vanishes. Consequently, the window, although still present, becomes unresponsive.

When the agent no longer receives frames, it would be more suitable to either close the window or continue invoking waitKey. In this post, we will explore a more advanced utilization of agent states to implement both of these strategies.

Observing when the stream goes idle

The central challenge revolves around detecting when the producer comes to a halt. One potential solution could involve having the image_viewer subscribe to the stop command. However, this approach has its drawbacks because the producer may take some time to process the stop command and send the final frames. Consequently, the image_viewer could potentially miss the last set of images.

Furthermore, in a more general scenario, the stop command might not even be available, or the producer might cease its operation for various other reasons, including temporary issues, hardware errors, and the like.

An alternative, middle-ground solution involves enhancing the communicativeness of producers. They could transmit notifications about the status of the stream and the camera, indicating when the acquisition commences, when it concludes, or if any problems arise. However, this approach is effective only when the image_viewer directly subscribes to the producer’s output. On the flip side, if the image_viewer is employed to visualize a different channel, such as the output of another agent, implementing this approach would necessitate additional logic, as notifications might not be part of the output of such an agent.

Hence, it is imperative to ensure that the image_viewer can function seamlessly with any image channel.

One universal approach to determine the stream’s current status is by implementing a time-based criterion: if a specific duration elapses without any images being transmitted through the channel, we can infer that the stream has experienced either a definitive interruption or pause.

We could derive the threshold from the camera’s frame rate, but there are situations where the device is externally triggered, frame by frame, by a physical entity like an encoder. This method indirectly affects the output rate dynamically. In such cases, more advanced solutions may involve continuously estimating and adjusting this threshold over time. However, it’s important to note that the image_viewer is primarily intended as a troubleshooting tool and won’t be frequently used in production. Therefore, using a static value may suffice for the time being.

Approach #1: just closing the window

The initial approach involves closing the window when the stream ends.

At present, the image_viewer operates exclusively in its default state. Upon receiving the first frame, it creates a window that is subsequently closed when the program concludes. Subsequent frames are then displayed within this window. As previously mentioned, one approach to identify when the stream becomes inactive involves monitoring the absence of incoming frames for a specified period.

SObjectizer’s agent states are quite sophisticated and provide some utilities that might be useful for developing a working solution. First of all, image_viewer can be modeled as a two-state agent:

  • handling images: as before, just display images as they arrive;
  • stream down: do nothing.

The main point is how to express transitions between the two:

  • when in handling images, if no frames arrive for a certain time limit, the agent transitions to stream down;
  • when exiting from handling images, the agent closes the window;
  • if an image arrives when in stream down, the agent transitions to handling images.

Here is the code:

class image_viewer final : public so_5::agent_t
{
    so_5::state_t st_handling_images{ this };
    so_5::state_t st_stream_down{ this };
public:
    image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        st_handling_images
            .event(m_channel, [this](const cv::Mat& image) {
                imshow(m_title, image);
                cv::waitKey(25);
                st_handling_images.time_limit(500ms, st_stream_down);
            })
            .on_exit([this] { cv::destroyWindow(m_title); });

        st_stream_down
            .transfer_to_state<cv::Mat>(m_channel, st_handling_images);

        st_stream_down.activate();
    }
private:

    so_5::mbox_t m_channel;
    std::string m_title = "image viewer";
};
Enter fullscreen mode Exit fullscreen mode

As you can see, states provide functions for expressing additional properties and transitions.

Some details:

  • on_exit takes a function that is executed when the agent leaves the state. It’s the opportunity to call destroyWindow;
  • transfer_to_state allows the agent to switch from the current state to another when a certain message arrives. In other words, while being in st_stream_down, if cv::Mat arrives, the agent transitions to st_handling_images and cv::Mat is handled there;
  • time_limit allows to set a maximum amount of time the agent may persist in a certain state.

It’s crucial to understand why time_limit is called inside the message handler. One can think that doing this would be equivalent:

st_handling_images
    .event(m_channel, [this](const cv::Mat& image) {
        imshow(m_title, image);
        cv::waitKey(25);
    })
    .time_limit(500ms, st_stream_down);
    .on_exit([this] { cv::destroyWindow(m_title); });
Enter fullscreen mode Exit fullscreen mode

However, in this case the timer is not reset when a new image arrives. The effect is seeing the window blinking while the stream is active, because it gets destroyed and created every 500ms! On the other hand, when time_limit is called inside the handler then its timer is reset after every image (that is the intended behavior, because we set a time limit after the last received frame).

As you can imagine, there is also on_enter that executes a function when transitioning into a state. Just for your information, both on_enter and on_exit mustn’t throw exceptions.

Note that this solution is not bulletproof: cv::waitKey(25) suffers the very same problem as estimating how long to await between two frames. However, as said, image_viewer is just for troubleshooting and can be enough for now. Feel free to change this number at will.

Here is a demo:

Approach #2: keeping the window responsive

Sometimes it can make sense to display the last acquired frame and keep the window responsive.

Basically, when the stream goes down, we should keep on calling waitKey until a new frame arrives. This should remind you of the post where we discussed how to model an infinite loop with messages. Indeed, the pattern fills the bill.

Also, since we have just given a taste of agent state utilities, we can use our new tools to express transitions succinctly.

We can still model the image viewer as a two-state machine, adding a signal to the party. When images arrive – that is st_handling_images – the agent behaves exactly as before. When it enters into st_stream_down, this time the agent sends call_waitkey to itself to start calling waitKey until cv::Mat arrives, and the state turns into st_handling_images again. Not forget to remove on_exit from st_handling_images since we don’t need to destroy the window anymore:

class image_viewer_live final : public so_5::agent_t
{
    struct call_waitkey final : so_5::signal_t {};
    so_5::state_t st_handling_images{ this };
    so_5::state_t st_stream_down{ this };   
public:
    image_viewer_live(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        st_handling_images
            .event(m_channel, [this](const cv::Mat& image) {
                imshow(m_title, image);
                cv::waitKey(25);
                st_handling_images.time_limit(500ms, st_stream_down);
            });

        st_stream_down
            .on_enter([this] {
                so_5::send<call_waitkey>(*this);
            })
            .event([this](so_5::mhood_t<call_waitkey>) {
                cv::waitKey(25);
                so_5::send<call_waitkey>(*this);
            }).transfer_to_state<cv::Mat>(m_channel, st_handling_images);

        st_handling_images.activate();
    }
private:
    so_5::mbox_t m_channel;
    std::string m_title = "image viewer live";
};
Enter fullscreen mode Exit fullscreen mode

Note that we activate st_handling_images first otherwise call_waitkey is sent without any existing window (OpenCV would not complain though).

Here is a demo:

Stream heartbeat logger

A variation of this pattern can be adopted for crafting another utility agent that logs the uptime of the stream, let’s say every 5 seconds. When the stream is active, we expect such logs:

[Heartbeat] Uptime: 00:00:05
[Heartbeat] Uptime: 00:00:10
[Heartbeat] Uptime: 00:00:15
[Heartbeat] Uptime: 00:00:20
...
Enter fullscreen mode Exit fullscreen mode

When the stream goes down, the agent shuts up. And if the stream goes up again, so does the agent (restarting from 0).

This time, we combine state transitions with periodic messages. Likewise iage_viewer_live, the agent sends a signal to itself, but not at breakneck speed, instead every 5 seconds:

class stream_heartbeat final : public so_5::agent_t
{
    struct log_heartbeat final : so_5::signal_t {};
    so_5::state_t st_handling_images{ this };
    so_5::state_t st_stream_down{ this };
public:
    stream_heartbeat(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        st_handling_images
            .on_enter([this] {
                m_start_time = std::chrono::steady_clock::now();
                m_timer = so_5::send_periodic<log_heartbeat>(so_direct_mbox(), 5s, 5s);
            })
            .event(m_channel, [this](const cv::Mat&) {
                st_handling_images.time_limit(500ms, st_stream_down);
            }).event([this](so_5::mhood_t<log_heartbeat>) {
                std::osyncstream(std::cout) << std::format("[Heartbeat] Uptime: {:%H:%M:%S}\n", std::chrono::floor<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time));
            }).on_exit([this] {
                m_timer.release();
            });

        st_stream_down
            .transfer_to_state<cv::Mat>(m_channel, st_handling_images);

        st_stream_down.activate();
    }
private:
    so_5::mbox_t m_channel;
    std::chrono::time_point<std::chrono::steady_clock> m_start_time;
    so_5::timer_id_t m_timer;
};
Enter fullscreen mode Exit fullscreen mode

A few details:

  • log_heartbeat indicates when it’s time to log the message on the console;
  • m_timer holds the periodic message and it’s released (aka: cancelled) when the stream goes down;
  • m_timer is (re)created when the stream goes up, along with m_start_time which represents the streaming start time;
  • the uptime is calculated as the number of seconds from the start time rounded up.

As usual, you can play with these new agents by adding them to the cooperation. For example:

int main()
{
    auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    auto main_channel = sobjectizer.environment().create_mbox("main");
    auto commands_channel = sobjectizer.environment().create_mbox("commands");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer_recursive>(main_channel, commands_channel);
        c.make_agent<image_viewer_live>(main_channel);
        c.make_agent<remote_control>(commands_channel);
        c.make_agent<stream_heartbeat>(main_channel);
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

Here is a demo:

Just a design digression

Since stream_heartbeat, image_viewer and image_viewer_live do the very same inner logic, we can think of moving that into a dedicated agent that sends stream notifications:

class stream_detector final : public so_5::agent_t
{
    so_5::state_t st_handling_images{ this };
    so_5::state_t st_stream_down{ this };

public:
    struct stream_up final : so_5::signal_t{};
    struct stream_down final : so_5::signal_t{};

    stream_detector(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t output)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_out_channel(std::move(output))
    {
    }

    void so_define_agent() override
    {
        st_handling_images
            .on_enter([this] {
                so_5::send<stream_up>(m_out_channel);
            })
            .event(m_channel, [this](const cv::Mat&) {
                st_handling_images.time_limit(500ms, st_stream_down);
            }).on_exit([this] {
                so_5::send<stream_down>(m_out_channel);
            });

        st_stream_down
            .transfer_to_state<cv::Mat>(m_channel, st_handling_images);

        st_stream_down.activate();
    }
private:
    so_5::mbox_t m_channel;
    so_5::mbox_t m_out_channel;
};
Enter fullscreen mode Exit fullscreen mode

At this point, stream_heartbeat boils down to:

class stream_heartbeat_with_detector final : public so_5::agent_t
{
    struct log_heartbeat final : so_5::signal_t {};
public:
    stream_heartbeat_with_detector(so_5::agent_context_t ctx, so_5::mbox_t detector_channel)
        : agent_t(std::move(ctx)), m_channel(std::move(detector_channel))
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_channel).event([this](so_5::mhood_t<stream_detector::stream_up>) {
            m_start_time = std::chrono::steady_clock::now();
            m_timer = so_5::send_periodic<log_heartbeat>(so_direct_mbox(), 5s, 5s);
        }).event([this](so_5::mhood_t<stream_detector::stream_down>) {          
            m_timer.release();
        });

        so_subscribe_self().event([this](so_5::mhood_t<log_heartbeat>) {
            std::osyncstream(std::cout) << std::format("[Heartbeat] Uptime: {:%H:%M:%S}\n", std::chrono::floor<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time));
        });
    }
private:
    so_5::mbox_t m_channel;
    std::chrono::time_point<std::chrono::steady_clock> m_start_time;
    so_5::timer_id_t m_timer;
};
Enter fullscreen mode Exit fullscreen mode

Here is an example of usage:

int main()
{
    auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    auto main_channel = sobjectizer.environment().create_mbox("main");
    auto commands_channel = sobjectizer.environment().create_mbox("commands");
    auto stream_notifications = sobjectizer.environment().create_mbox("stream");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer_recursive>(main_channel, commands_channel);
        c.make_agent<stream_detector>(main_channel, stream_notifications);
        c.make_agent<stream_heartbeat_with_detector>(stream_notifications);
        c.make_agent<image_viewer_live>(main_channel);
        c.make_agent<remote_control>(commands_channel);
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

Clearly, rather than adding a new channel, main_channel can host output notifications as well:

c.make_agent(main_channel, main_channel); // output to main_channel
c.make_agent(main_channel);

Adding an extra channel might be not just a matter of style or taste. We tend to keep some channels separate, especially to avoid mixing “raw” data with “derived” data. In the next post we’ll get back to this a bit more.

Another design argument regards stream_heartbeat_with_detector.

It’s evident that stream_heartbeat_with_detector is less generic than stream_heartbeat because it can’t simply work with any “image channel”, but only with those carrying stream_up and stream_down. This creates an implicit dependency on stream_detector, and we can further solidify this relationship by placing both stream_detector and heartbeat_with_detector into the same cooperation.

On the other hand, stream_heartbeat_with_detector has gained two interesting features: firstly, it can now function seamlessly with any image type, eliminating its previous dependency on cv::Mat. For example, if we introduce introduce a new SDKImage and integrate it with stream_detector, stream_heartbeat_with_detector will continue to work without modification. Secondly, it no longer handles the “500 milliseconds” threshold, which was previously used to determine if a stream becomes idle. This responsibility is now solely within the domain of stream_detector (about this threshold, however, the central point discussed earlier in this article still stands: the most effective approach to determine if the stream has started and concluded is to have the producer send appropriate signals).

It’s important to highlight that the first feature has a wider applicability. In the scenario where we have numerous agents designed to work exclusively with cv::Mat (such as image_viewer and image_tracer), if, at some point, we introduce support for SDKImage into the program, we can simplify the transition by introducing an adapter agent that converts SDKImage into cv::Mat (assuming it’s feasible). This adaptation would enable the current agents to seamlessly continue their operations without requiring any extra modifications:

The good news is that we do have a choice. Wanna opt for putting all the logic into a few agents, minimizing message exchange and type proliferation? Deal. Or do you prefer fine-grained agents and more modularity? Deal.

Takeaway

In this episode we have learned:

  • agent states provide functions to express transitions and particular conditions that must be guaranteed;
  • on_enter executes a function when the agent transitions to a certain state;
  • on_exit executes a function when the agent leaves a certain state;
  • on_enter and on_exit handlers mustn’t throw exceptions;
  • time_limit sets the maximum time the agent can be in a certain state (and can be reset from a message handler);
  • combining agent states with periodic messages can be powerful.

As usual, calico is updated and tagged.

What’s next?

Our manager is highly impressed with the project’s advancements and is eager to put it to use. However, with an important customer demonstration on the horizon, she has a new request:

“The customer will be captivated by the system’s flexibility. Could you add more agents and provide examples showcasing the power of composition?”

In the forthcoming post, we will introduce a few more agents, highlighting the dynamic potential of composition in action!


Thanks to Yauheni Akhotnikau for having reviewed this post.

Top comments (0)