SObjectizer features a basic mechanism for gathering and distributing runtime telemetry data regarding its internals, such as the number of registered cooperations, the quantity of delayed or periodic messages, the length of event queues, the time spent processing events for each handler, and so on.
This feature is essential for getting insights into the performance and health of our applications at runtime, avoiding the necessity to develop our own monitoring system.
Following up our recent discussion about performance metrics, Helen is back with her discoveries about this feature of SObjectizer that is called Runtime monitoring. In this article we’ll learn how to use this tool effectively and we’ll apply it to calico
.
Runtime monitoring in a nutshell
Runtime monitoring is seamlessly integrated into SObjectizer through conventional mechanisms: message passing and subscriptions.
Essentially, when the feature is enabled, SObjectizer automatically generates certain data sources that observe specific internal aspects of SObjectizer and generate corresponding metrics. For example, there is a data source for counting the number of cooperations registered within the environment. These data sources transmit their information to a dedicated component of the telemetry system known as the stats controller. The stats controller manages the activation and deactivation of telemetry data collection and distribution. When the telemetry is enabled, the stats controller runs in a separate thread. To distribute data, the stats controller grants access to a specialized message box that operates in the standard manner via subscriptions. It’s worth noting that telemetry samples are not continuously distributed; instead, the distribution period can be configured, as we’ll see in a moment.
Practically speaking, to collect telemetry data, we must explicitly turn on the stats controller and, optionally, set a distribution period – otherwise the default value is 2 seconds.
The stats controller is accessed through the environment:
// turn on telemetry
so_environment().stats_controller().turn_on();
// send telemetry data every second
so_environment().stats_controller().set_distribution_period(1s);
Since these options can be changed anytime, we introduce an agent that enables or disables telemetry collection depending on the current state, when a specific command is received (e.g. pressing t
):
struct enable_telemetry_command final : so_5::signal_t {};
class telemetry_agent final : public so_5::agent_t
{
public:
telemetry_agent(so_5::agent_context_t ctx)
: agent_t(ctx), m_commands(so_environment().create_mbox("commands"))
{
}
void so_define_agent() override
{
st_turned_off.event(m_commands, [this](so_5::mhood_t<enable_telemetry_command>) {
so_environment().stats_controller().turn_on();
so_environment().stats_controller().set_distribution_period(1s);
st_turned_on.activate();
});
st_turned_on.event(m_commands, [this](so_5::mhood_t<enable_telemetry_command>) {
so_environment().stats_controller().turn_off();
st_turned_off.activate();
});
st_turned_off.activate();
}
private:
state_t st_turned_on{ this }, st_turned_off{ this };
so_5::mbox_t m_commands;
};
At this point, we are ready to subscribe for telemetry data.
How to subscribe for telemetry data
Runtime telemetry is delivered as messages defined into the namespace stats::messages
. In particular, this namespace defines a message type representing a generic quantity of data:
stats::messages::quantity<T>
This message type is intended to convey information regarding quantities of a certain telemetry entity, such as the number of working threads for a dispatcher or the quantity of registered cooperations. In essence, when subscribing for telemetry data, we will receive a quantity<T>
.
You might expect that T
represents the type of telemetry information we are interested in. After all, one of the key advantages of SObjectizer is “message pattern matching”. However, T
solely denotes the “unit type” associated with such a quantity, such as size_t
. For instance, to obtain information about the number of working threads for a dispatcher, we receive a quantity<size_t>
. This type is essentially hardcoded, indicating the type of data being transmitted, and should be retrieved by the documentation. At the time of writing, the only unit type available is size_t
.
At this point, you might wonder how to differentiate between various telemetry types, like the count of registered cooperations and the number of working threads for a dispatcher, if they both arrive as quantity<size_t>
. The answer lies in the quantity type itself: it includes not only the numerical “value” of the quantity but also carries information about the data source that generated it.
In particular, data sources have unique names, each consisting of two strings:
- a prefix, indicating a “group” of related data sources;
- a suffix, indicating the specific data source of the group which produced the quantity.
For example, here below:
mbox_repository/named_mbox.count
mbox_repository
is the prefix, whereas /named_mbox.count
is the suffix.
Prefixes could be considered “dynamic” in nature, indicating that they may reference entities generated at runtime. For instance, consider these two distinct prefixes identifying two different one_thread
dispatchers:
disp/ot/DEFAULT
disp/ot/0x3be520
0x3be520
uniquely identifies a particular dispatcher and can only be generated – then referenced – at runtime.
Conversely, suffixes are “fixed” strings, signifying that they pertain to predetermined categories of information within SObjectizer. For example:
/agent.count
/demands.count
/named_mbox.count
Therefore, when receiving a quantity<T>
, we might filter the “type” of information by examining its prefix and suffix. To simplify this process, SObjectizer furnishes all “predefined” prefixes and suffixes in stats::prefixes
and stats::suffixes
namespaces.
Although prefixes for dispatcher-related information are not supplied since they are dynamic, the rules regulating their generation are rather straightforward:
- every prefix starts with
disp
; - then it follows an abbreviated form based on the specific dispatcher, such as:
-
/ot
— for one_thread dispatcher; -
/ao
— for active_obj dispatcher; -
/ag
— for active_group dispatcher; -
/tp
— for thread_pool dispatcher; -
/atp
— for adv_thread_pool dispatcher;
-
- finally, the last part is a unique identification for a dispatcher instance that typically includes the dispatcher name (when specified) or an hex representation of the address of the dispatcher object.
It’s worth noting that all standard dispatcher factories (e.g. make_dispatcher()
) accept a parameter to specify the name of the dispatcher.
Now, suppose we enhance our telemetry_agent
to record statistics concerning our active_obj
dispatcher, which is bound to the majority of our agents. Hence, we should subscribe to the stats_controller
‘s message box and then we should only consider quantities of a specified active_obj
dispatcher. First of all, we add a name to the dispatcher when we create the cooperation:
sobjectizer.environment().introduce_coop(
so_5::disp::active_obj::make_dispatcher(sobjectizer.environment(), "calico_active_object").binder(), [&](so_5::coop_t& c) {
// ...
});
Then, we install a delivery filter to discard any other telemetry data (one motivation for introducing delivery filters in SObjectizer was to facilitate discarding of uninteresting telemetry quantities):
so_set_delivery_filter(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::quantity<size_t>& q) {
return std::string_view(q.m_prefix.c_str()).contains("/ao/calico_active_object");
});
We opted for wrapping the prefix into a string_view
since SObjectizer provides prefix and suffix as raw char pointers. Update: SObjectizer 5.8.2 provides a function as_string_view()
for both prefix_t
and suffix_t
. Finally, we add the subscription:
st_turned_on.event(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::quantity<size_t>& q) {
std::osyncstream(std::cout) << q.m_prefix << q.m_suffix << "=" << q.m_value << "\n";
});
Another common way to filter quantities involves the suffix, allowing retrieval of data from all sources that produce a specific category of telemetry information. For instance, suppose we monitor the queue size of each dispatcher (the number of events awaiting processing). The suffix to verify against can be located in stats::suffixes::work_thread_queue_size()
:
so_set_delivery_filter(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::quantity<size_t>& q) {
return q.m_suffix == so_5::stats::suffixes::work_thread_queue_size();
});
Note that if we install multiple filters, only the last one is taken. Thus, to get /ao/calico_active_object
OR any information about queue sizes, we might merge the two conditions as follows:
so_set_delivery_filter(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::quantity<size_t>& q) {
return
std::string_view(q.m_prefix.c_str()).contains("/ao/calico_active_object")
|| q.m_suffix == so_5::stats::suffixes::work_thread_queue_size();
});
Monitoring the queue size of each dispatcher is a significant metric to detect potential bottlenecks, akin to assessing throughput, as discussed in the initial post about performance. Especially noteworthy is the potential need for further investigation if the queue size shows a gradual increase over time.
The complete list of all available suffixes and prefixes is published on the official documentation. We recommend referring to it to explore the additional telemetry data provided by SObjectizer.
Identifying start and end of each telemetry batch
You may have observed that the telemetry_agent
receives one quantity for each telemetry data sent from the data sources. Then we typically set up some filters to refine the data to suit our specific interests. For example, suppose we have this setup of calico
:
int main()
{
const auto ctrl_c = utils::get_ctrlc_token();
const wrapped_env_t sobjectizer;
const auto main_channel = sobjectizer.environment().create_mbox("main");
const auto commands_channel = sobjectizer.environment().create_mbox("commands");
const auto message_queue = create_mchain(sobjectizer.environment());
sobjectizer.environment().introduce_coop(disp::active_obj::make_dispatcher(sobjectizer.environment(), "calico_active_object").binder(), [&](coop_t& c) {
c.make_agent<image_producer_recursive>(main_channel, commands_channel);
c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);
c.make_agent<telemetry_agent>();
});
do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}
Once the program is started and the ‘t
‘ key is pressed in the remote_control
window, logs similar to the following begin to appear:
disp/ot/DEFAULT/wt-0/demands.count=0
disp/ao/calico_active_object/agent.count=3
disp/ao/calico_active_object/wt-0x1baa413f9f0/demands.count=0
disp/ao/calico_active_object/wt-0x1baa67c6170/demands.count=0
disp/ao/calico_active_object/wt-0x1baa68b27d0/demands.count=5
disp/ot/DEFAULT/wt-0/demands.count=0
disp/ao/calico_active_object/agent.count=3
disp/ao/calico_active_object/wt-0x1baa413f9f0/demands.count=0
disp/ao/calico_active_object/wt-0x1baa67c6170/demands.count=0
disp/ao/calico_active_object/wt-0x1baa68b27d0/demands.count=4
disp/ot/DEFAULT/wt-0/demands.count=0
disp/ao/calico_active_object/agent.count=3
disp/ao/calico_active_object/wt-0x1baa413f9f0/demands.count=0
disp/ao/calico_active_object/wt-0x1baa67c6170/demands.count=0
disp/ao/calico_active_object/wt-0x1baa68b27d0/demands.count=4
Every line is a different quantity delivered to the telemetry_agent
that passes through our filter. Thus, the corresponding message handler will be called 5 times. Nothing unexpected here.
Essentially, the stats controller generates a telemetry “batch” (called also “cycle” or “period”) occurring every second (as we configured before). At this point, it might be convenient to identify when a single batch starts and finishes, perhaps to enhance the clarity of the log. SObjectizer provides the necessary components to accomplish this task in an effective way. Specifically, the stats controller dispatches two “status” messages: one before sending the first quantity and another after dispatching the last quantity of each batch:
-
messages::distribution_started
-
messages::distribution_finished
Agents can subscribe to these two messages and do whatever they need. For example, we add these two subscriptions to our telemetry_agent
:
st_turned_on.event(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::distribution_started&) {
std::osyncstream(std::cout) << "telemetry batch starts----\n";
});
st_turned_on.event(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::distribution_finished&) {
std::osyncstream(std::cout) << "telemetry batch ends----\n";
});
With this modification, the log above might change to:
telemetry batch starts----
disp/ot/DEFAULT/wt-0/demands.count=0
disp/ao/calico_active_object/agent.count=3
disp/ao/calico_active_object/wt-0x1baa413f9f0/demands.count=0
disp/ao/calico_active_object/wt-0x1baa67c6170/demands.count=0
disp/ao/calico_active_object/wt-0x1baa68b27d0/demands.count=5
telemetry batch ends----
telemetry batch starts----
disp/ot/DEFAULT/wt-0/demands.count=0
disp/ao/calico_active_object/agent.count=3
disp/ao/calico_active_object/wt-0x1baa413f9f0/demands.count=0
disp/ao/calico_active_object/wt-0x1baa67c6170/demands.count=0
disp/ao/calico_active_object/wt-0x1baa68b27d0/demands.count=4
telemetry batch ends----
telemetry batch starts----
disp/ot/DEFAULT/wt-0/demands.count=0
disp/ao/calico_active_object/agent.count=3
disp/ao/calico_active_object/wt-0x1baa413f9f0/demands.count=0
disp/ao/calico_active_object/wt-0x1baa67c6170/demands.count=0
disp/ao/calico_active_object/wt-0x1baa68b27d0/demands.count=4
telemetry batch ends----
You might wonder why these are messages and not signals. Well, this is to allow SObjectizer developers to add some content to the message in the future.
It’s important to highlight that this follows a classical pattern in message passing-styled applications. We provided another example of this pattern in the previous post regarding the image_cache
.
This covers nearly everything you need to begin working with telemetry! In the upcoming section, we’ll introduce another telemetry capability of SObjectizer that provides information about the time spent in dispatcher threads, essential to get a snapshot of the exhibited performance of one or more agents.
Observing thread activity
An additional category of telemetry data available, not provided as quantity
, pertains to the time spent inside event-handlers and the time spent waiting for the next event in threads managed by dispatchers. This feature is not enabled by default to prevent any negative impact on dispatchers’ performance. Enabling this support must be done individually for a specific dispatcher or for the entire environment. For instance:
auto disp = so_5::disp::active_obj::make_dispatcher( env, "my_disp",
// Collecting of work thread activity is enabled
// in the dispatcher's parameters
disp_params_t{}.turn_work_thread_activity_tracking_on()
);
Or globally:
so_5::launch(
[]( so_5::environment_t & env ) {
// ...
}
[]( so_5::environment_params_t & params ) {
// Enable work thread activity statistics collection explicitly
params.turn_work_thread_activity_tracking_on();
// ...
}
);
To enable support for all but certain dispatchers, we typically enable the feature globally while selectively disable it for some:
so_5::launch(
[]( so_5::environment_t & env ) {
// ...
// disable this one
auto my_disp = so_5::disp::one_thread::make_dispatcher(
env, "my_disp",
so_5::disp::one_thread::disp_params_t{}
.turn_work_thread_activity_tracking_off() );
// ...
[]( so_5::environment_params_t & params ) {
// enable all the others
params.turn_work_thread_activity_tracking_on();
// ...
}
);
The telemetry data related to thread activity are distributed using the message type:
stats::messages::work_thread_activity
In addition to prefix and suffix strings, this type contains three information:
-
m_thread_id
that is valued with the thread identifier related to the data; -
m_working_stats
that contains information on the time spent in event handlers; -
m_waiting_stats
that contains information on the time spent waiting for new events.
m_working_stats
and m_waiting_stats
have type so_5::stats::activity_stats_t
that contains:
-
m_count
that is the number of events collected so far (this value won’t decrease); -
m_total_time
that is the total time spent for events so far (this value won’t decrease); -
m_avg_time
that is the average time spent for events so far.
In essence, the “working stats” object includes metrics regarding the time during which the specific thread was engaged in processing work, akin to the concept of “service time” we explored in a previous post. Conversely, the “waiting stats” object contains details about the duration spent awaiting new events (that is not the “waiting time” discussed before).
We add this subscription to our telemetry_agent
:
st_turned_on.event(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::work_thread_activity& q) {
std::osyncstream(std::cout) << q.m_prefix.c_str() << q.m_suffix.c_str() << "[" << q.m_thread_id << "]\n"
<< " working: " << q.m_stats.m_working_stats << "\n"
<< " waiting: " << q.m_stats.m_waiting_stats << "\n";
});
Let’s see this in action. Suppose we have this setup:
int main()
{
const auto ctrl_c = utils::get_ctrlc_token();
const wrapped_env_t sobjectizer;
const auto main_channel = sobjectizer.environment().create_mbox("main");
const auto commands_channel = sobjectizer.environment().create_mbox("commands");
const auto message_queue = create_mchain(sobjectizer.environment());
sobjectizer.environment().introduce_coop(disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](coop_t& c) {
c.make_agent<image_producer_recursive>(main_channel, commands_channel);
c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);
c.make_agent_with_binder<face_detector>(
make_dispatcher(sobjectizer.environment(), "face_detector",
so_5::disp::active_obj::disp_params_t{}.turn_work_thread_activity_tracking_on().binder()),
main_channel);
c.make_agent<telemetry_agent>();
});
do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}
This way, we name only face_detector
‘s dispatcher face_detector
. Also, to keep “ordinary” telemetry data about our face_detector
dispatcher, we change telemetry_agent'
s filter this way:
so_set_delivery_filter(so_environment().stats_controller().mbox(), [](const so_5::stats::messages::quantity<size_t>& q) {
return std::string_view(q.m_prefix.c_str()).contains("/ao/face_detector");
});
Then we launch the program and we press ‘t
‘ to start collecting. This is a possible log:
telemetry batch starts----
disp/ao/face_detector/agent.count=1
disp/ao/face_detector/wt-0x131d8f30020/demands.count=0
disp/ao/face_detector/wt-0x131d8f30020/thread.activity[27952]
working: [count=1;total=0.0164ms;avg=0.0164ms]
waiting: [count=2;total=543.154ms;avg=271.577ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=1
disp/ao/face_detector/wt-0x131d8f30020/demands.count=0
disp/ao/face_detector/wt-0x131d8f30020/thread.activity[27952]
working: [count=1;total=0.0164ms;avg=0.0164ms]
waiting: [count=2;total=1543.58ms;avg=771.79ms]
telemetry batch ends----
As evident, there are no events (demands.count=0
) and the total time spent on active work remains constant. Also, both the working and waiting counters do not increase, but only the waiting time increases. In practical terms, this suggests that the thread is effectively idle. This aligns with our scenario, where we expect no active work from the face_detector
until frames are received.
Then we press ‘Enter’ to acquire frames and the log changes a bit:
telemetry batch starts----
disp/ao/face_detector/agent.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/demands.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/thread.activity[11832]
working: [count=268;total=5199.48ms;avg=18.944ms]
waiting: [count=251;total=7389.55ms;avg=22.8689ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/demands.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/thread.activity[11832]
working: [count=298;total=5748.59ms;avg=18.7555ms]
waiting: [count=281;total=7840.36ms;avg=20.8269ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/demands.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/thread.activity[11832]
working: [count=328;total=6322.38ms;avg=18.8271ms]
waiting: [count=311;total=8266.64ms;avg=19.0942ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/demands.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/thread.activity[11832]
working: [count=358;total=6885.93ms;avg=18.8066ms]
waiting: [count=341;total=8703.1ms;avg=17.8755ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=1
disp/ao/face_detector/wt-0x1c0bb525cd0/demands.count=0
disp/ao/face_detector/wt-0x1c0bb525cd0/thread.activity[11832]
working: [count=387;total=7428ms;avg=18.7564ms]
waiting: [count=371;total=9161.09ms;avg=17.1918ms]
telemetry batch ends----
Now, demands.count
fluctuates between 0 and 1. This means, in practice, the thread is fast enough to dequeue and process events from the queue. Also, other metrics are increasing. This means, the thread is doing some work. It appears that the face_detector
is consuming, on average, 18ms per event. It’s worth noting that this average is computed across the entire set of events from the start, not just the last batch. This detail is crucial because libraries often have warm-up times that can influence this calculation. Hence, for precise profiling, it’s advisable to conduct tests in a more controlled environment. Nonetheless, this provides a general insight into the situation: the face detector’s work is sustainable within the system, with the total waiting time surpassing the total working time!
Suppose at this point, we simply add another face_detector
to the system, using the same dispatcher in order to include it to the telemetry log:
int main()
{
const auto ctrl_c = utils::get_ctrlc_token();
const wrapped_env_t sobjectizer;
const auto main_channel = sobjectizer.environment().create_mbox("main");
const auto commands_channel = sobjectizer.environment().create_mbox("commands");
const auto message_queue = create_mchain(sobjectizer.environment());
sobjectizer.environment().introduce_coop(disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](coop_t& c) {
c.make_agent<image_producer_recursive>(main_channel, commands_channel);
c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);
const auto disp = make_dispatcher(env.environment(), "face_detector", so_5::disp::active_obj::disp_params_t{}.turn_work_thread_activity_tracking_on());
c.make_agent_with_binder<face_detector>(disp, mbox);
c.make_agent_with_binder<face_detector>(disp, mbox);
c.make_agent<telemetry_agent>();
});
do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}
As before, before starting the camera acquisition, the log is quite predictable:
telemetry batch starts----
disp/ao/face_detector/agent.count=2
disp/ao/face_detector/wt-0x1a3c8825280/demands.count=0
disp/ao/face_detector/wt-0x1a3c8825280/thread.activity[30292]
working: [count=1;total=0.0008ms;avg=0.0008ms]
waiting: [count=1;total=1763.31ms;avg=1763.31ms]
disp/ao/face_detector/wt-0x1a3c8825780/demands.count=0
disp/ao/face_detector/wt-0x1a3c8825780/thread.activity[15092]
working: [count=1;total=0.0004ms;avg=0.0004ms]
waiting: [count=1;total=1763.27ms;avg=1763.27ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=2
disp/ao/face_detector/wt-0x1a3c8825280/demands.count=0
disp/ao/face_detector/wt-0x1a3c8825280/thread.activity[30292]
working: [count=1;total=0.0008ms;avg=0.0008ms]
waiting: [count=1;total=2763.14ms;avg=2763.14ms]
disp/ao/face_detector/wt-0x1a3c8825780/demands.count=0
disp/ao/face_detector/wt-0x1a3c8825780/thread.activity[15092]
working: [count=1;total=0.0004ms;avg=0.0004ms]
waiting: [count=1;total=2763.1ms;avg=2763.1ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=2
disp/ao/face_detector/wt-0x1a3c8825280/demands.count=0
disp/ao/face_detector/wt-0x1a3c8825280/thread.activity[30292]
working: [count=1;total=0.0008ms;avg=0.0008ms]
waiting: [count=1;total=3763.81ms;avg=3763.81ms]
disp/ao/face_detector/wt-0x1a3c8825780/demands.count=0
disp/ao/face_detector/wt-0x1a3c8825780/thread.activity[15092]
working: [count=1;total=0.0004ms;avg=0.0004ms]
waiting: [count=1;total=3763.77ms;avg=3763.77ms]
telemetry batch ends----
We have 2 agents and threads are still idle.
Things get more interesting after starting the camera:
telemetry batch starts----
disp/ao/face_detector/agent.count=2
disp/ao/face_detector/wt-0x1a3c8825280/demands.count=16
disp/ao/face_detector/wt-0x1a3c8825280/thread.activity[30292]
working: [count=37;total=1647.67ms;avg=44.5316ms]
waiting: [count=1;total=4118.41ms;avg=4118.41ms]
disp/ao/face_detector/wt-0x1a3c8825780/demands.count=7
disp/ao/face_detector/wt-0x1a3c8825780/thread.activity[15092]
working: [count=46;total=1510.06ms;avg=32.8274ms]
waiting: [count=11;total=4255.96ms;avg=386.905ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=2
disp/ao/face_detector/wt-0x1a3c8825280/demands.count=10
disp/ao/face_detector/wt-0x1a3c8825280/thread.activity[30292]
working: [count=73;total=2647.73ms;avg=36.2703ms]
waiting: [count=1;total=4118.41ms;avg=4118.41ms]
disp/ao/face_detector/wt-0x1a3c8825780/demands.count=15
disp/ao/face_detector/wt-0x1a3c8825780/thread.activity[15092]
working: [count=68;total=2510.12ms;avg=36.9135ms]
waiting: [count=11;total=4255.96ms;avg=386.905ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=2
disp/ao/face_detector/wt-0x1a3c8825280/demands.count=14
disp/ao/face_detector/wt-0x1a3c8825280/thread.activity[30292]
working: [count=98;total=3648.62ms;avg=37.2308ms]
waiting: [count=1;total=4118.41ms;avg=4118.41ms]
disp/ao/face_detector/wt-0x1a3c8825780/demands.count=14
disp/ao/face_detector/wt-0x1a3c8825780/thread.activity[15092]
working: [count=98;total=3511.01ms;avg=35.8266ms]
waiting: [count=11;total=4255.96ms;avg=386.905ms]
telemetry batch ends----
telemetry batch starts----
disp/ao/face_detector/agent.count=2
disp/ao/face_detector/wt-0x1a3c8825280/demands.count=27
disp/ao/face_detector/wt-0x1a3c8825280/thread.activity[30292]
working: [count=115;total=4648.92ms;avg=40.8211ms]
waiting: [count=1;total=4118.41ms;avg=4118.41ms]
disp/ao/face_detector/wt-0x1a3c8825780/demands.count=11
disp/ao/face_detector/wt-0x1a3c8825780/thread.activity[15092]
working: [count=131;total=4511.31ms;avg=34.035ms]
waiting: [count=11;total=4255.96ms;avg=386.905ms]
telemetry batch ends----
As you see, two face_detector
s working concurrently within the system are suffering a bit: each requires double the time taken to process a single frame when operating individually! Also, look at the waiting event counters: at times, they remain constant, indicating that the thread is consistently saturated and it rarely gets a break. Moreover, pay attention to the trend of both demands.count
: initially, it appears that thread 30292
can dequeue events more rapidly, but then it starts buffering. Conversely, thread 15092
follows an opposite trend.
Thus, enabling this additional telemetry collection on thread activity is convenient to spot possible issues related to event handlers. However, as the documentation remembers, the impact of this feature largely depends on the type of dispatcher and the load profile. For one_thread
dispatchers handling heavy message streams, this impact is scarcely noticeable. However, for active_obj
dispatchers dealing with occasional messages, performance can decrease by 3-4 times. Consequently, it’s challenging to provide precise numbers on cumulative performance loss due to the significant variability in results.
Nevertheless, it’s important to emphasize that this mechanism is considered secondary, primarily intended for debugging and profiling purposes. Therefore, it’s imperative to assess the performance impact of work thread activity tracking on your application. Only after thorough evaluation should a decision be made regarding whether to enable this mechanism in the production environment.
Design considerations
The first aspect we discuss in this brief section regards how to integrate telemetry support into our applications (existing or new).
First, for “ordinary” telemetry (not related to threading activity), we can easily integrate it by utilizing agents similar to telemetry_agent
. Runtime monitoring can be toggled on and off dynamically without much additional effort, simply by accessing the environment instance. It’s common to integrate the application with commands, as demonstrated with telemetry_agent
, and to have multiple agents that process specific telemetry information.
When telemetry is enabled, we can introduce specific agents based on the type of issue we’re investigating or the data we’re interested in. These agents might be always up and running, like telemetry_agent
, or created on demand. The approach depends on the specific requirements and architecture of the application.
On the contrary, thread activity monitoring is more intrusive and necessitates modifications to how a dispatcher is instantiated. Ideally, we suggest incorporating this setting into the application’s configuration. However, in cases where the application needs to allow interactive toggling of activity tracking, the only option is to recreate the dispatchers. This typically involves deregistering some cooperations and agents, potentially causing temporary interruptions in their operation.
The second aspect regards how to add our own telemetry data into the telemetry system. It’s quite easy, as we only need to implement the interface stats::source_t
that consists of a single function. For example:
struct my_data_source : so_5::stats::source_t
{
void distribute(const so_5::mbox_t& box) override
{
so_5::send<so_5::stats::messages::quantity<size_t>>(box, "mydata", "/some-category", 125);
}
};
The stats controller invokes this function of all the registered data sources when it’s time to “sample” telemetry data. It’s important to mention a couple of things regarding prefixes and suffixes:
-
prefix
is limited to 47 characters + null-terminator (exceeding characters will be discarded byquantity
‘s constructor); -
suffix
just copies the given pointer and not the data. This means, the given string should be either statically allocated or kept alive until the quantity is referenced. Typically, we use statically allocated strings or string literals in this case. It’s important to note that comparing two suffixes is the same as comparing their two pointers and not the content they point to.
To register a new data source, we add an instance of it to the stats_repository
, a special object that collects all the data sources. It’s accessible from the environment:
my_data_source my_source;
env.environment().stats_repository().add(my_source);
It’s mandatory to keep the instance alive until the system is up and running. Typically, dispatchers implement the data source interface or keep a data source instance alive. Thus, whenever we develop a custom dispatcher intended for production, it’s good practice to add telemetry support this way. Additionally, it’s important to manually remove the data source to prevent potential issues where the stats controller attempts to call distribute()
while the data source is being destructed.
To simplify this process, SObjectizer offers a couple of wrappers that handle the registration and deregistration of a data source automatically in a RAII fashion:
-
auto_registered_source_holder_t
-
manually_registered_source_holder_t
The key distinction between the two lies in the manual control over registration and deregistration provided by the latter, while still automating the removal during destruction if necessary. A minimal example is provided here below:
class some_agent final : public so_5::agent_t
{
so_5::stats::auto_registered_source_holder_t<my_data_source> m_ds_holder;
public:
some_agent(agent_context_t ctx )
: so_5::agent_t(std::move(ctx)), m_ds_holder(so_5::outliving_mutable( so_environment().stats_repository() ))
{}
};
int main()
{
so_5::launch( [](so_5::environment_t & env) {
env.introduce_coop([](so_5::coop_t& coop) {
// ... other agents
coop.make_agent<some_agent>();
} );
});
}
On the other hand, if we need to defer adding the data source to the stats controller, we might opt for manually_registered_source_holder_t
:
class some_agent final : public so_5::agent_t
{
so_5::stats::manually_registered_source_holder_t<my_data_source> m_ds_holder;
public:
some_agent(agent_context_t ctx )
: so_5::agent_t(std::move(ctx))
{}
void so_evt_start() override
{
m_ds_holder.start(so_5::outliving_mutable( so_environment().stats_repository() ) );
}
};
Additionally, manually_registered_source_holder_t
provides a stop()
function to remove the data source before destruction.
Takeaway
In this episode we have learned:
- SObjectizer provides a feature to collect and distribute runtime monitoring information that are telemetry data regarding its internals;
- runtime monitoring is disabled by default as it might have a negative performance impact;
- when enabled, telemetry data are collected and distributed by the stats controller, an entity that runs in a dedicated thread and is accessible from the environment using
env.stats_controller()
; - to enable runtime monitoring, we call
env.stats_controller().turn_on()
; - to disable runtime monitoring, we call
env.stats_controller().turn_off()
; - the stats controller distributes data at intervals configured using
set_distribution_period(number-of-secs)
; - distribution of telemetry data happens as usual: subscriptions to a certain message box;
- the stats controller provides access to the standard message box for telemetry data through
mbox()
; - telemetry data are represented by
messages::quantities<size_t>
which, in addition to the value, includes the data source name in the form of prefix and suffix; - the data source prefix identifies a group of related data sources (e.g. a particular dispatcher);
- the data source suffix identifies the particular category of telemetry information (e.g. size of the event queue);
- typically, we use delivery filters to keep only telemetry data we are interested in;
- at every interval, all the telemetry quantities are delivered; to identify batches, two special messages are sent by the stats controller:
distribution_started
anddistribution_finished
; - it’s possible to enable – per dispatcher and also globally – additional telemetry data regarding thread activity such as the time spent processing events;
- thread activity statistics are delivered as messages of type
messages::work_thread_activity
; - to add custom data sources, we implement the interface
source_t
which provides a single function.
As usual, calico
is updated and tagged.
What’s next?
With the recent addition of Ronnie, a new developer unfamiliar with SObjectizer, we’ve encountered a common dilemma: choosing the appropriate dispatchers to bind agents to. While Ronnie grasps the concept of decomposing business logic into agents that exchange messages, selecting the right dispatchers presents a challenge. To address this, we’ve scheduled pair programming sessions with Ronnie to share our guidelines and considerations on dispatcher binding.
Stay tuned for the next installment, where we delve into this topic in detail.
Thanks to Yauheni Akhotnikau for having reviewed this post.
Top comments (0)