DEV Community

Cover image for How to keep a history of MQTT data with Node.js
Alexey Timin for ReductStore

Posted on • Edited on • Originally published at reduct.store

How to keep a history of MQTT data with Node.js

The MQTT protocol is very popular in IoT applications. It is a simple way to connect different data sources
with your application by using a publish/subscribe model. Sometimes you may want to keep a history of your MQTT data to
use it for model training, diagnostics or metrics. If your data sources provide different formats of data that can
not be interpreted as time series of floats, ReductStore is what you need.

Let's make a simple MQTT application to see how it works.

Prerequisites

For this usage example we have the following requirements:

  • Linux AMD64
  • Docker and Docker Compose
  • NodeJS >= 16

If you're an Ubuntu user, use this command to install the dependencies:

$ sudo apt-get update
$ sudo apt-get install docker-compose nodejs
Enter fullscreen mode Exit fullscreen mode

Run MQTT Broker and ReductStore with Docker Compose

The easiest way to run the broker and the storage is to use Docker Compose. So we should create a docker-compose.yml
file in the example's folder with the services:

version: "3"
services:
  reduct-storage:
    image: reductstore/reductstore:latest
    volumes:
      - ./data:/data
    ports:
      - "8383:8383"

  mqtt-broker:
    image: eclipse-mosquitto:1.6
    ports:
      - "1883:1883"
Enter fullscreen mode Exit fullscreen mode

Then run the configuration:

docker-compose up
Enter fullscreen mode Exit fullscreen mode

Docker Compose downloaded the images and ran the containers. Pay attention that we published ports 1883 for MQTT
protocol and 8383 for ReductStore HTTP API.

Write NodeJS script

Now we're ready to make hands dirty with code. Let's initialize the NPM package and
install MQTT Client and
JavaScript Client SDK.

$ npm init
$ npm install --save reduct-js async-mqtt 
Enter fullscreen mode Exit fullscreen mode

When we have all the dependencies installed, we can write the script:

const MQTT = require('async-mqtt');
const {Client} = require('reduct-js');

MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
    await mqttClient.subscribe('mqtt_data');

    const reductClient = new Client('http://localhost:8383');
    const bucket = await reductClient.getOrCreateBucket('mqtt');

    mqttClient.on('message', async (topic, msg) => {
        const record = await bucket.beginWrite(topic);
        await record.write(msg)
        console.log('Received message "%s" from topic "%s" is written', msg,
            topic);
    });

}).catch(error => console.error(error));

Enter fullscreen mode Exit fullscreen mode

Let's look at the code in detail. First, we have to connect to the MQTT broker and subscribe to a topic. The topic name
just random string, which producers should know. In our case, it is mqtt_data:


MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
    await mqttClient.subscribe('mqtt_data');

    // rest of code
}).catch(error => console.error(error));
Enter fullscreen mode Exit fullscreen mode

If the MQTT connection is successful, we can start dealing with ReductStore. To start writing data there, we need a
bucket. We create a bucket with the name mqtt or get an existing one:

const reductClient = new Client('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');
Enter fullscreen mode Exit fullscreen mode

The last step is to write the received message to the storage. We must use a callback
for event message, to catch it. Then we write the message to entry mqtt_data:

mqttClient.on('message', async (topic, msg) => {
    const record = await bucket.beginWrite(topic);
    await record.write(msg)
    console.log('Received message "%s" from topic "%s" was written', data,
        topic);
});
Enter fullscreen mode Exit fullscreen mode

When we call bucket.beginWrite we create an entry in the bucket if it doesn't exist yet. Then we write data to the
entry with the current timestamp. Now our MQTT data is safe and sound in the storage, and we can access them by using
the same SDK.

Publish data to MQTT topic

When you launch the script, it does nothing because there is no data from MQTT. You have to publish something to topic
mqtt_data. I prefer to use mosquitto_pub. For Ubuntu users, it is a
part of the mosquitto-clients package:

$ sudo apt-get install mosquitto-clients
$ mosuitto_pub -t mqtt_data -m "Hello, world!"
Enter fullscreen mode Exit fullscreen mode

Getting data from ReductStore

Now you know how to get data from MQTT and write it to ReductStore, but we need a little NodejS script to read the data
from the storage:

const {Client} = require('reduct-js');

const client = new Client('http://localhost:8383');

client.getBucket('mqtt').then(async (bucket) => {
    const record  = await bucket.beginRead('mqtt_data');
    console.log('Last record: %s', await record.readAsString());

    // Get data for lash hour
    const stopTime = BigInt(Date.now() * 1000);
    const startTime = stopTime - 3_600_000_000n;

    for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
        const data = await record.read();
        console.log('Found record "%s" with timestamp "%d"', data, record.time);
    }

}).catch(error => console.error(error));

Enter fullscreen mode Exit fullscreen mode

To read the latest record in the entry is very easy:

const record  = await bucket.beginRead('mqtt_data');
const data = await record.readAsString();
Enter fullscreen mode Exit fullscreen mode

To take data for timeinterval, we can use the query method. It returns an async iterator, so we can use for await
loop to iterate over the records:

const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;

for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
    const data = await record.read();
    console.log('Found record "%s" with timestamp "%d"', data, record.time);
}
Enter fullscreen mode Exit fullscreen mode

Pay attention, the storage uses timestamps with microsecond precision, so we can't use Date class and number type.
What is why we use BigInt.

Conclusion

As you can see, the MQTT protocol and ReductStore very simple technologies that can be used together very easily in
NodeJS.
You can find the source code of the example here. If you have any questions or problems running it. Feel free to
make an issue.

I hope this tutorial has been helpful. Thanks!

Links

Top comments (0)