Introduction
This blog post explains how to build a Vehicle Command Log Store to keep track of command requests and responses sent to vehicles by application clients. To this purpose, we look at extracting MQTT 5 metadata information from MQTT messages published using MQTT v5, and enrich payloads via an AWS IoT Core Rules Engine Rules. Processed data is stored in an Amazon DynamoDB table.
The goal is to use as little bespoke code on the cloud side as possible and lean on native integrations on the AWS Cloud side, like the IoT Rule with Dynamo DB Action.
For a detailed walk-through a live demo of this solution, you can watch the video linked below on the IoT Builders YouTube channel:
Enriching Payloads with MQTT 5 Metadata
MQTT 5 Request/Response Pattern
As part of this blog post, we exploring a feature of MQTT 5: the Request/Response pattern
.
The Request/Response messaging pattern is a method to track responses to client requests in an asynchronous way. It’s a mechanism implemented in MQTTv5 to allow the publisher to specify a topic for the response to be sent on a particular request. When the subscriber receives the request, it also receives the topic to send the response on. This pattern supports a correlation data field that allows tracking of packets, e.g. request or device identification parameters.
Let's look at an example:
We want to send commands to vehicles over MQTT from client applications. The flow is as follows:
Vehicles subscribe to the request topic, and client applications subscribe to their decided response topics, which can be dependent on the application instance id, for example.
-
App clients publish requests on the request topic. In our case, the payload is a simple text
DOOR_LOCK
indicating the command to lock the vehicle doors. Following the MQTT 5 pattern, in addition to the payload, we are sending metadata like theResponse Topic
,Content Type
andCorrelation Data
:- a
request id
, - a
timestamp
- a
user id
.
This data helps correlate MQTT requests and their responses.
- a
-
As the device receives the MQTT message, it executes the command and publishes the response on the specified response topic. Upon publishing, it re-send the correlation data, but also appends response specific information, such as:
- a
response timestamp
, - the
command
it is responding to, marked red on the diagram.
- a
The app instance receives this information on the response topic it subscribed to previously.
Prerequisites and Approach
Prerequisites
To recreate this demo locally, one needs to already have in place:
- An AWS Account with permissions to create IoT resources.
- Two created AWS IoT things for the app and car simulators, with locally stored certificates and keys to connect to AWS IoT Core.
- An IoT Core policy allowing connections, subscriptions and data publishing for both IoT Things on the configured topics.
- An Amazon DynamoDB table needs to be created beforehand, with a unique identifier ‘msgId’ string as primary key.
This section looks at the steps to be performed:
Step 1: Setting up the the MQTTJS clients
First, we build two client simulators, for the car and the application client. We use MQTTJS for Javascript and its MQTT v5 implementation support. Step 1 below will describe the details of the MQTT client implementation.
Upon running the application simulator, it connects, subscribes to the response topic and stays connected. Every 10 seconds, the application simulator publishes a DOOR_LOCK
text message command. Upon running the car simulator, it connects and stays connected, then receives a command from the app client, simulates its execution with a JavaScript timer and sends back a DOOR_LOCK_SUCCESS
response.
The MQTT Request/Response pattern implementation is exemplified in the code snippets below:
Car Simulator
Connection options and connection creation:
//create options for MQTT v5 client
const options = {
clientId: CLIENT_ID,
host: ENDPOINT,
port: PORT,
protocol: 'mqtts',
protocolVersion: 5,
cert: fs.readFileSync(CERT_FILE),
key: fs.readFileSync(KEY_FILE),
reconnectPeriod: 0,
enableTrace: false
}
//connect
const client = mqtt.connect(options);
MQTT Event Handlers implementation:
client.on('connect', (packet) => {
console.log('connected');
console.log('subscribing to', SUB_TOPIC);
client.subscribe(SUB_TOPIC);
});
//handle Messages
client.on('message', (topic, message, properties) => {
console.log('Received Message',message.toString());
console.log('Received Message Properties', properties );
if(message && message.toString() === 'DOOR_LOCK' ) {
console.log('Executing', JSON.stringify(message));
setTimeout(() => {
const response = 'DOOR_LOCK_SUCCESS';
const responseTopic = properties.properties.responseTopic;
console.log("ResponseTopic: ", responseTopic);
console.log('Publishing Response: ', response," to Topic: ", responseTopic.toString());
const cDataString = properties.properties.correlationData.toString();
console.log('Correlation Data String: ', cDataString)
let correlationDataJSON = JSON.parse(cDataString);
correlationDataJSON.resp_ts = new Date().toISOString();
correlationDataJSON.cmd = message.toString();
client.publish(responseTopic.toString(), response, {
qos: 1,
properties: {
contentType: 'text/plain',
correlationData: JSON.stringify(correlationDataJSON)
}
}, (error, packet) => {
//ERROR Handlers here.
});
}, 5000);
}
});
App Client Simulator
Connection settings are the same as above.
MQTT Event Handlers implementation:
//handle the message
client.on('message', (topic, message, properties) => {
//Only log for now.
console.log('Received message: ', message.toString());
console.log('Received message properties: ', properties );
});
Publish the command on Interval for testing purposes:
//publish
setInterval(() => {
const requestId = v4();
// const message = JSON.stringify({ping: 'pong'});
console.log('Publishing message ');
client.publish(PUB_TOPIC, 'DOOR_LOCK', {
qos: 1, properties: {
responseTopic: SUB_TOPIC,
contentType: 'text/plain',
correlationData: JSON.stringify({
requestId: requestId,
userId: userId,
req_ts: new Date().toISOString()
})
}
}
, (error, packet) => {
console.log(JSON.stringify(packet))
})
}, 10000);
Step 2: AWS IoT Rule with Amazon DynamoDB Action
An AWS IoT Rule can be used to enrich the message payload with MQTT 5 response topic information, as well as correlation data and content type. We are interested in both command requests and responses, each of them published to different MQTT topics. Therefore, the IoT Rule we create must select data from both topics, as show in the diagram below.
Because we are building a Vehicle Command Log Store and in this scenario such payloads are sent with content-type text/plain
, we are filtering only for such messages in the Rule SQL statement.
As the desired behavior is storing the command messages and metadata in a Dynamo DB table, the rule will prepare a new JSON object to be stored. Each key will be stored as a separate column in Amazon Dynamo DB. A new unique message identifier (msgId
) is created to be the primary key of the table. For extracting the MQTT 5 metadata, the Rule SQL uses the get_mqtt_property(name)
SQL function. Encoding/Decoding functions are used to manipulate data from and to base64
encoded strings.
The Rule Action to be specified is Amazon DynamoDB, pointing to the previously created table.
The IoT Rule SQL is shown below:
SELECT
{"msgId": newuuid(),
"name": decode(encode(*, 'base64'), 'base64'),
"requestId": decode(get_mqtt_property('correlation_data'), 'base64').requestId, "req_ts": decode(get_mqtt_property('correlation_data'), 'base64').req_ts,
"cmd": decode(get_mqtt_property('correlation_data'), 'base64').cmd,
"resp_ts": decode(get_mqtt_property('correlation_data'), 'base64').resp_ts,
"userId": decode(get_mqtt_property('correlation_data'), 'base64').userId, "responseTopic": get_mqtt_property('response_topic') }
FROM 'cmd/#'
where get_mqtt_property('content_type') = 'text/plain'
After running the simulation for some minutes, you should see your Amazon Dynamo DB table populated with data entries as showed in the image below:
Conclusion
This blog post shows how to use the MQTT 5 IoT Rules SQL functions to extract MQTT 5 metadata from messages and use them to enrich device payloads. One of the design goals was to achieve enrichment and storage with native cloud integrations and no bespoke message processing code.
The demonstrated Vehicle Command Log Store use-case is just an example, to explore the art of the possible. For more information about the AWS IoT Core Rules Engine SQL functions for MQTT 5 support, have a look at the documentation.
If you are interested in detailed walk-through a live demo of this solution, you can watch the YouTube video. To get notified about more IoT content, you can subscribe to the IoT Builders YouTube channel.
Top comments (0)