When an actor requires performing certain actions at shutdown, we typically consider using so_evt_finish()
. However, if the operation involves message passing, so_evt_finish()
is not the right choice as it might be too late for receivers to get the message. Indeed, agents within deregistering cooperations are unable to receive new messages. This means that a message dispatched from so_evt_finish()
is not assured to be received by subscribers, as they could have already been deregistered or prevented from processing new messages (technically, they are unbound from the event queue).
Delivering messages during shutdown is a common problem in message passing applications, and frameworks often provide features to address it.
This issue crosses our path as we delve into the implementation of a new business need. During the recent planning session, Robin, our product owner, presented the team with a new opportunity: the company is embarking on a new project where calico
could be an excellent solution. However, for being used in this project, there’s one missing feature: the ability to cache and process images in size-limited batches. Collaborating with Robin, we gathered additional details and determined that we require a new agent capable of storing images up to a specified limit and then transmitting them all to an output channel.
The requirement is somewhat ambiguous: should we transmit the entire batch or individual images? We’ll proceed with the assumption that we can send them one by one. This approach ensures compatibility with existing agents, allowing them to function seamlessly. For instance, if the image_tracer
subscribes to the output of this “cache agent,” it will eventually receive and handle several images without requiring any changes to its implementation. In contrast, handling a “batch” would necessitate transmitting and subscribing to a new type of message.
The logic behind the “cache agent” is straightforward: it accumulates images up to a specified limit. Once this limit is reached, it sends all the accumulated images to an output channel, effectively flushing the cache to restart the process. Alternatively, accumulation can be triggered by a signal in other scenarios. (e.g. we might want to cache the entire “acquisition” that occurs between two commands) but we stick with the size limit approach that is enough for the purpose of this article.
The implementation of this agent is here below:
class image_cache final : public so_5::agent_t
{
public:
image_cache(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output, unsigned size)
: agent_t(ctx), m_input(std::move(input)), m_output(std::move(output)), m_session_size(size), m_accumulated(0), m_cache(size)
{
}
void so_define_agent() override
{
so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> img) {
if (m_session_size != m_accumulated)
{
m_cache[m_accumulated++] = img.make_holder();
}
else
{
flush();
}
});
}
private:
void flush()
{
for (auto& img : std::views::take(m_cache, m_accumulated))
{
so_5::send(m_output, std::move(img));
}
m_accumulated = 0;
}
so_5::mbox_t m_input;
so_5::mbox_t m_output;
unsigned m_session_size;
unsigned m_accumulated;
std::vector<so_5::message_holder_t<cv::Mat>> m_cache;
};
As we explored in a previous post, we use message_holder_t
to participate in the ownership of the image in order not to copy data.
At this point, we seek clarification from Robin on a crucial matter: what action should be taken at shutdown if the cache still holds some data? After consulting with the team from the new project, Robin returns with a conservative decision: they prefer that any remaining content be sent to the output channel. To elaborate, Robin specifies that the cache will output to a channel which will be subscribed from the network, exploiting our gRPC service_facade
agent.
However, if we send leftovers from so_evt_finish()
they are not guaranteed to be received. We need to explore another feature of SObjectizer designed to solve this problem.
Stop guards
The solution offered by SObjectizer is called shutdown prevention schema and it’s provided as the concept of stop guard.
The idea is pretty intuitive: we can install “stop guards” that are classes implementing the stop_guard_t
interface consisting of a single stop
function:
class my_stop_guard final : public so_5::stop_guard_t
{
public:
void stop() noexcept override
{
// ...some operation...
// I can so_5::send data here!
}
// ...
};
When the environment is requested to stop by calling environment_t::stop()
, SObjectizer does not simply initiate the shutdown procedure but it first calls stop_guard_t::stop()
function for every installed stop guard. This operation is synchronous and causes the thread calling environment_::stop()
to block until each stop()
returns. For instance, if we let an instance of wrapped_env_t
within the main
function to automatically trigger the shutdown, stop()
will be invoked from the main thread (this is our scenario in calico
).
Then SObjectizer waits until all the stop guards are removed. Hence, we must remove all installed stop guards at some point. Once they are all removed, the environment initiates the shutdown operation, as usual. It’s worth reiterating this point: SObjectizer does not impose a time limit for the shutdown operation. This means that if any stop guard remains installed, the shutdown process will not start at all.
In other words, when stop_guard_t::stop()
is invoked, the shutdown process has not started yet. This means, agents are still bound to message queues and can receive messages. Thus, so_5::send
ing a message at that point is guaranteed to be delivered. Now, it should clear why this is called “shutdown prevention”.
The pattern works this way:
- a stop guard is installed by using
so_environment::setup_stop_guard(guard)
; -
guard
is an instance ofshared_ptr<stop_guard_t>
(or anything convertible); - a stop guard is removed by using
so_environment::remove_stop_guard(guard)
.
Now, let’s explore how to address the shutdown requirement within the image_cache
agent.
First of all, we craft a stop guard that sends a signal to image_cache
to inform the shutdown is upcoming. The implementation might be included into image_cache
as it’s an internal detail:
class image_cache final : public so_5::agent_t
{
struct shutdown_requested final : so_5::signal_t {};
class cache_stop_guard final : public so_5::stop_guard_t
{
public:
cache_stop_guard(so_5::mbox_t channel)
: m_channel(std::move(channel))
{
}
void stop() noexcept override
{
so_5::send<shutdown_requested>(m_channel);
}
private:
so_5::mbox_t m_channel;
};
// ... as before
};
Then we must install the stop guard. This might be done in different places and we’ll get back to that later. For now, let’s do it in the agent’s constructor:
class image_cache final : public so_5::agent_t
{
// ... as before
public:
image_cache(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output, unsigned size)
: agent_t(ctx), m_input(std::move(input)), m_output(std::move(output)), m_session_size(size),
m_accumulated(0), m_cache(size), m_shutdown_guard(std::make_shared<cache_stop_guard>(so_direct_mbox()))
{
so_environment().setup_stop_guard(m_shutdown_guard);
}
private:
// ... as before
so_5::stop_guard_shptr_t m_shutdown_guard;
};
stop_guard_shptr_t
is just an alias for shared_ptr<stop_guard_t>
.
Finally, we need to handle the signal shutdown_requested
by flushing the cache and then removing the stop guard from the environment:
void so_define_agent() override
{
// ... as before
so_subscribe_self().event([this](so_5::mhood_t<shutdown_requested>) {
flush();
so_environment().remove_stop_guard(m_shutdown_guard);
});
}
Let’s see this in action:
int main()
{
const auto ctrl_c = utils::get_ctrlc_token();
const so_5::wrapped_env_t sobjectizer;
const auto main_channel = sobjectizer.environment().create_mbox("main");
const auto commands_channel = sobjectizer.environment().create_mbox("commands");
const auto message_queue = create_mchain(sobjectizer.environment());
sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer_recursive>(main_channel, commands_channel);
c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);
const auto cache_out = sobjectizer.environment().create_mbox();
c.make_agent<image_cache>(main_channel, cache_out, 50);
c.make_agent<image_tracer>(cache_out);
});
do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}
The sequence of events will be as follows: after starting the acquisition, the image_tracer
will receive data only when the image_cache
sends a burst. Finally, upon request for shutdown, any remaining cache data will be delivered before the actual shutdown.
Consider that image_cache
could potentially be extended to send two additional messages. These messages would be dispatched before the first image of each batch and after the last one. This approach enables receivers to identify the beginning and end of each batch. These are commonly referred to as batch_start
and batch_end
, or alternatively, burst_start
and burst_end
. It’s worth noting that starting messages could include details such as the number of images that will be included in the batch.
There are a few missing technical details about stop guards.
First of all, stop_guard_t::stop()
is noexcept
as it shouldn’t throw exceptions. This is because if an exception occurs, there is no way to rollback shutdown-specific actions that were performed prior to the exception. Consequently, if an exception is thrown, the shutdown preparation procedure could be disrupted without any opportunity for recovery.
Also, it is only possible to install stop guards before the first invocation of environment_t::stop()
. Once stop()
has been called, any subsequent attempts to call setup_stop_guard()
will fail. By default, the setup_stop_guard()
method will throw an exception if stop()
has already been called. However, if this behavior is not suitable, it can be modified by providing a second argument to setup_stop_guard()
:
const auto result = so_environment().setup_stop_guard(
my_guard,
stop_guard_t::what_if_stop_in_progress_t::return_negative_result );
if(stop_guard_t::setup_result_t::ok != result)
{
// ... handle this condition properly
}
Finally, it’s important to clarify that calling remove_stop_guard
multiple times with the same instance is permitted. SObjectizer simply ignores stop guards that are not currently installed.
Deadlock alert
You may have noticed that installing stop guards could conceal a potential “resource leak” (not a memory leak) that leads to a deadlock. This is because remove_stop_guard()
might not be invoked if certain issues arise (or by mistake) and we have no automatic cleanup in place. For example, this can occur in the case of an exception, such as one thrown from so_define_agent()
or any event handlers.
In such cases, setup_stop_guard()
is not followed by a corresponding call to remove_stop_guard()
, preventing the shutdown process to start and the program to terminate.
A solution to this problem consists in “guarding the guard” that is introducing a guard type that simply removes the stop guard when destroyed. Here is a minimal example (there exist other solutions):
struct guard_remover
{
guard_remover(so_5::environment_t& env, so_5::stop_guard_shptr_t guard)
: m_env(env), m_guard(std::move(guard))
{
m_env.setup_stop_guard(m_guard);
}
guard_remover(const guard_remover&) = delete;
guard_remover(guard_remover&&) = delete;
guard_remover& operator=(const guard_remover&) = delete;
guard_remover& operator=(guard_remover&&) = delete;
~guard_remover()
{
remove();
}
void remove() const
{
m_env.remove_stop_guard(m_guard);
}
so_5::environment_t& m_env;
so_5::stop_guard_shptr_t m_guard;
};
Then we can incorporate this wrapper into the agent, for example:
class image_cache final : public so_5::agent_t
{
// ... as before
public:
image_cache(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output, unsigned size)
: agent_t(ctx), m_input(std::move(input)), m_output(std::move(output)),
m_session_size(size), m_accumulated(0), m_cache(size),
m_guard_remover(so_environment(), std::make_shared<cache_stop_guard>(so_direct_mbox()))
{
}
// ... as before
void so_define_agent() override
{
// ... as before
so_subscribe_self().event([this](so_5::mhood_t<shutdown_requested>) {
flush();
m_guard_remover.remove();
});
}
private:
// ... as before
guard_remover m_guard_remover;
};
We opted for encapsulating the stop guard setup into the “stop guard remover”, but this is not the only option (also, we don’t need to reference the stop guard from image_cache
anymore). By calling remove()
explicitly, we can anticipate the stop guard removal. However, since we are allowed to call remove_stop_guard()
multiple times with the same instance, we don’t need to check if the guard has been already removed.
This guard ensures the stop guard is removed even in case of issues leading the agent to be deregistered and destroyed before the normal shutdown.
It’s worth mentioning a brief discussion I had with Yauheni regarding this topic. Another approach to address this issue involves installing a stop guard in so_evt_start()
and removing it in so_evt_finish()
. In this scenario, when the agent is notified by the stop guard that shutdown is imminent, it must deregister itself (as we learned in a previous post). This action triggers so_evt_finish()
prematurely, leading to the removal of the stop guard.
Just to let you know that SObjectizer’s companion project, so5extra, provides the shutdowner class that implements the shutdown prevention schema in a different way.
Takeaway
In this episode we have learned:
- during shutdown, agents can’t receive new messages; this means, a message sent from
so_evt_finish()
is not guaranteed to be delivered; - to solve this problem, SObjectizer provides a shutdown prevention schema called stop guard;
- we can install stop guards whose
stop()
function will be called just before the shutdown starts; - a stop guard is installed using
setup_stop_guard()
; -
stop()
functions are called from the same thread requesting the shutdown; - SObjectizer then expects that all installed stop guards are removed, then it starts the shutdown procedure as usual;
- a stop guard is removed using
remove_stop_guard()
; - calling
remove_stop_guard()
multiple times with the same instance is permitted. SObjectizer simply ignores stop guards that are not currently installed; -
stop_guard::stop()
isnoexcept
and shouldn’t throw exceptions; - a stop guard is like an acquired resource: not removing it leads to preventing shutdown;
- to ensure that a guard is removed even in case of exceptions, RAII wrappers might be employed.
As usual, calico
is updated and tagged.
What’s next?
As the new feature has been implemented during this sprint, we are collaborating with the new project’s team to configure and test calico
for their specific requirements. In the meantime, we receive a message from Helen who has something particularly intriguing to share, following up our recent discussion about performance.
In the next post, we’ll delve into the standard telemetry metrics offered by SObjectizer right out of the box.
Thanks to Yauheni Akhotnikau for having reviewed this post.
Top comments (0)