In the previous episode, we added a new functionality to the service that allows clients to subscribe for images. Unfortunately, we’ve identified a notable problem with the way it’s been implemented. Essentially, even after a client disconnects, we persist in attempting to transmit data to it, even though it’s no longer accessible. Also, we keep alive both the gRPC thread and the agent’s worker thread, resulting in a waste of resources.
It’s imperative that we actively determine when a client has disconnected, and when this occurs, we should swiftly reclaim the corresponding agent, along with all its related resources and subscriptions, and subsequently return from the rpc method in order to free the blocked gRPC thread.
In this article, we’ll delve into the process of deregistering an active agent, addressing this pivotal aspect of our service’s functionality. Typically, this functionality is crucial for any sophisticated utilization of SObjectizer that includes the dynamic creation of agents and cooperations, like task schedulers and coordinators.
To begin with, it’s essential to identify when a client has disconnected. In the context of gRPC, this corresponds to a situation where an invocation of ServerWriter::Write()
returns false
. In this case, there is no way to recover the stream with the client and we can consider that terminated. Thus, the change to the agent class should be quite straightforward:
class subscribe_client_agent : public so_5::agent_t
{
// ... as before
void so_define_agent() override
{
for (const auto& channel : m_channels)
{
so_subscribe(channel).event([chan_name = channel->query_name(), this](const cv::Mat& image) {
subscribe_response response;
response.set_channel_name(chan_name);
imencode(".jpg", image, m_raw_data, { cv::IMWRITE_JPEG_QUALITY, 95 });
response.mutable_image()->set_image_data(m_raw_data.data(), m_raw_data.size());
// has the client gone?
if (!m_writer.Write(response))
{
// handle this condition properly
}
});
}
}
// ... as before
};
The question that arises at this point is: how can an agent actively trigger its own termination? In general, agents are terminated when their cooperation ends. Without any additional intervention, this occurs when the SObjectizer environment is stopped, which, in our case, happens when we terminate the program.
When I encountered this problem for the first time, the use case was a bit different but the necessity was the very same. I came up with a solution on my own and opened an issue to seek the development team’s input on whether my approach was in line with SObjectizer best practices. Yauheni confirmed that I was on the right track, and our conversation continued, yielding even more valuable insights.
Cooperation deregistration
Back then, when searching for a function to “deregister” an agent, I couldn’t find any relevant results. This led me to ponder why such a function didn’t exist. After some consideration, I grasped the underlying rationale: it lies in the foundational principles of SObjectizer: a cooperation represents a collection of agents that are intended to function as a unified entity. Eliminating any of these agents would result in the loss of the cohesive nature of the cooperation, rendering it inconsistent.
On the other hand, SObjectizer does provide an agent function to deregister an entire cooperation:
so_deregister_agent_coop(int reason)
The library also provides a few “standard” reason values in so_5::dereg_reason
. You might remember this from a previous episode about testing where we installed a dereg notificator to catch possible cooperation failures.
Actually, there is also a shorthand for so_deregister_agent_coop(so_5::dereg_reason::normal)
that is so_deregister_agent_coop_normally().
Once I discovered that, my idea was to put agents that may require deregistration within their own dedicated cooperation and securely invoke the deregistration function at the appropriate time. In my original use case, I also had to establish a parent-child relationship to bring together all these “individual” cooperations under a common parent, for example, to facilitate their collective deregistration when required. However, this is not required in general.
Back to our scenario, we have that all subscribe_client_agent
instances are already introduced into a dedicated cooperation, then we can directly invoke so_deregister_agent_coop_normally()
:
if (!m_writer.Write(response))
{
so_deregister_agent_coop_normally();
}
End of the story?
Agent deactivation
When I tried this for the first time, I realized a potential drawback: when the failure condition is triggered, the agent could continue processing messages until its cooperation is fully deregistered, and the agent is subsequently terminated. As Yauheni explained to me in the original issue on GitHub:
“It’s intended behavior. It can be useful in the cases where a coop contains several agents: one of them is a leader that decides when so_deregister_agent_coop()
has to be called, all other agents just do their work. After the call of so_deregister_agent_coop()
by the leader, all other agents have a possibility to complete their work (e.g. they will process messages already in the event-queue, and those messages can be important for some other agents in the application).”
He concludes by highlighting a crucial point:
“Generally speaking, the deregistration can be seen as a statement I want to quit as soon as I complete all my current work, but not as a demand I want to be killed right now“.
Fundamentally, both registration and deregistration of cooperation operate asynchronously, and work like, let’s say, a cooperative cancellation mechanism. For further insight, SObjectizer’s documentation provides meticulous details.
In our scenario, this would be fine because calling both m_writer.Write(response)
and so_deregister_agent_coop()
multiple times, in practice, has no effect. Indeed, if the deregistration process has already started, it will conclude at some point. Nevertheless, the prerequisites can vary, potentially necessitating an immediate cessation of handling other messages.
What to do in this case?
As Yauheni suggested at that time, a common way to do this consists in introducing an extra state where the agent does nothing:
class demo : public so_5::agent_t {
state_t st_deactivated{this};
// ...
void on_some_event(mhood_t<some_message> cmd)
{
if(some_condtion)
st_deactivated.activate();
// ...
}
...
};
Actually, the initial problem I encountered inspired the team to implement the agent function so_deactivate_agent()
, which performs this exact action without necessitating the introduction of an additional “fake” state. This functionality became accessible starting from SObjectizer v.5.7.3.
It’s important to note that this function essentially serves as a shortcut to transition to a specialized state where all subscriptions have been dropped. Practically, this implies that some resources associated with the agent, such as any worker threads (for instance, when the agent is linked to the active_obj
dispatcher), remain allocated until the agent (the cooperation) is completely deregistered.
Thus, the “definitive” idiom for triggering agent shutdown consists in calling both so_deactivate_agent()
and so_deregister_agent_coop()
(as long as the agent lives in its own cooperation):
if (!m_writer.Write(response))
{
so_deactivate_agent();
so_deregister_agent_coop_normally();
}
This idiom guarantees that when the failure occurs, the agent promptly shifts to what we informally refer to as the “deactivated state,” essentially representing a state of “ignore incoming messages but keep me alive.” In the meantime, the cooperation initiates its deregistration sequence, ultimately leading to the termination of the agent at a later stage.
Takeaway
In this episode we have learned:
- a cooperation can be deregistered using
so_deregister_agent_coop(int reason)
; - for “normal” reasons, it’s preferable to use
so_deregister_agent_coop_normally()
; - cooperation’s registration and deregistration procedures are asynchronous and can potentially take some time;
- as a result, agents may continue to receive some messages until terminated;
- to “deactivate” an agent and make it ignore any message, use
so_deactivate_agent()
; - bear in mind that
so_deactivate_agent()
may not release some resources associated with the agent; - the combination of
so_deregister_agent_coop_normally()
andso_deactivate_agent()
forms a common idiom to deactivate an agent while its cooperation completes the termination process entirely.
As usual, calico
is updated and tagged.
What’s next?
The service has improved, but there’s still room for enhancement: What happens if the client disconnects while the stream isn’t in progress? In other words, how can we detect if the client has gone without trying to Write
data?
Stay tuned for the next article where we’ll explore a solution to this particular issue!
Thanks to Yauheni Akhotnikau for having reviewed this post.
Top comments (0)