DEV Community

Cover image for How to build your own MQTT broker in Nest.js
Behnam Nasehi
Behnam Nasehi

Posted on

How to build your own MQTT broker in Nest.js

What is MQTT ?

MQTT (Message Queuing Telemetry Transport) is an open source, lightweight messaging protocol, optimized for low latency. This protocol provides a callable and cost-efficient way to connect devices using a publish/subscribe model. A communication system built on MQTT consists of the publishing server, a broker and one or more clients. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks.

Pigeon MQTT Nestjs

Pigeon MQTT Nestjs is broker that can run on any stream server

Installation

Install From NPM :

$ npm i pigeon-mqtt-nest
Enter fullscreen mode Exit fullscreen mode

Usage

Pigeon will register as a global module. You can
import with configuration

@Module({
  imports: [

    PigeonModule.forRoot({
      port: 1884,
      id: "binarybeast",
      concurrency:100,
      queueLimit:42,
      maxClientsIdLength:23,
      connectTimeout:30000,
      heartbeatInterval:60000,
     })

  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
}
Enter fullscreen mode Exit fullscreen mode
  • options <object>
    • mq <MQEmitter> middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. Default: mqemitter
    • concurrency <number> maximum number of concurrent messages delivered by mq. Default: 100
    • persistence <Persistence> middleware that stores QoS > 0, retained, will packets and subscriptions. Default: aedes-persistence (in memory)
    • queueLimit <number> maximum number of queued messages before client session is established. If number of queued items exceeds, connectionError throws an error Client queue limit reached. Default: 42
    • maxClientsIdLength option to override MQTT 3.1.0 clients Id length limit. Default: 23
    • heartbeatInterval <number> an interval in millisconds at which server beats its health signal in $SYS/<aedes.id>/heartbeat topic. Default: 60000
    • id <string> aedes broker unique identifier. Default: uuidv4()
    • connectTimeout <number> maximum waiting time in milliseconds waiting for a [CONNECT][CONNECT] packet. Default: 30000

Handlers

Handler Emitted When
preConnect Invoked when server receives a valid CONNECT packet.
authenticate Invoked after preConnect.
authorizePublish publish LWT to all online clients,incoming client publish
authorizeSubscribe restore subscriptions in non-clean session.,incoming client SUBSCRIBE
published same as Event: publish, but provides a backpressure functionality.

Handler: preConnect

  • client: <Client>
  • packet: <object> [CONNECT][CONNECT]
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked when server receives a valid [CONNECT][CONNECT] packet. The packet can be modified.

client object is in default state. If invoked callback with no errors and successful be true, server will
continue to establish a session.

Any error will be raised in connectionError event.

Some Use Cases:

  1. Rate Limit / Throttle by client.conn.remoteAddress
  2. Check aedes.connectedClient to limit maximum connections
  3. IP blacklisting
@Injectable()
export class TestService {

  @onPreConnect()
  onPreConnect(@Client() client, @Packet() packet, @Function() done) {
    console.log("Function: @onPreConnect()");
    return done(null, true);
  }

}
Enter fullscreen mode Exit fullscreen mode

Handler: authenticate

  • client: <Client>
  • credential: <string>
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked after preConnect.

Server parses the [CONNECT][CONNECT] packet, initializes client object which set client.id to match the one
in [CONNECT][CONNECT] packet and extract username and password as parameters for user-defined authentication flow.

If invoked callback with no errors and successful be true, server authenticates client and continues to setup
the client session.

If authenticated, server acknowledges a [CONNACK][CONNACK] with returnCode=0, otherwise returnCode=5. Users could
define the value between 2 and 5 by defining a returnCode property in error object.

@Injectable()
export class TestService {

  @onAuthenticate()
  onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
    console.log("Function: @onAuthenticate()");
    return done(null, true);
  }

}
Enter fullscreen mode Exit fullscreen mode
@Injectable()
export class TestService {

  @onAuthenticate()
  onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
    console.log("Function: @onAuthenticate()");
    var error = new Error('Auth error')
    error.returnCode = 4
    return done(error, false);
  }

}
Enter fullscreen mode Exit fullscreen mode

Please refer to Connect Return Code
to see their meanings.

Handler: authorizePublish

  • client: <Client> | null
  • packet: <object> [PUBLISH][PUBLISH]
  • callback: <Function> (error) => void
    • error <Error> | null

Invoked when

  1. publish LWT to all online clients
  2. incoming client publish

client is null when aedes publishes obsolete LWT without connected clients

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error. If
an error occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2)

@Injectable()
export class TestService {

  @onAuthorizePublish()
  onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
    console.log("Function: @onAuthorizePublish()");
    if (packet.topic === 'aaaa') {
      return done(new Error('wrong topic'))
    }
    if (packet.topic === 'bbb') {
      packet.payload = Buffer.from('overwrite packet payload')
    }
    return done(null);
  }

}
Enter fullscreen mode Exit fullscreen mode

By default authorizePublish throws error in case a client publish to topics with $SYS/ prefix to prevent possible
DoS (see #597). If you write your own implementation
of authorizePublish we suggest you to add a check for this. Default implementation:

@Injectable()
export class TestService {

  @onAuthorizePublish()
  onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
    if (packet.topic.startsWith($SYS_PREFIX)) {
      return done(new Error($SYS_PREFIX + ' topic is reserved'))
    }
    return done(null);
  }

}
Enter fullscreen mode Exit fullscreen mode

Handler: authorizeSubscribe

  • client: <Client>
  • subscription: <object>
  • callback: <Function> (error) => void
    • error <Error> | null
    • subscription: <object> | null

Invoked when

  1. restore subscriptions in non-clean session.
  2. incoming client [SUBSCRIBE][SUBSCRIBE]

subscription is a dictionary object like { topic: hello, qos: 0 }.

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error.

In general user should not touch the subscription and pass to callback, but server gives an option to change the
subscription on-the-fly.

@Injectable()
export class TestService {

  @onAuthorizeSubscribe()
  onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
    console.log("Function: @onAuthorizeSubscribe()");
    if (subscription.topic === 'aaaa') {
      return done(new Error('wrong topic'))
    }
    if (subscription.topic === 'bbb') {
      // overwrites subscription
      subscription.topic = 'foo'
      subscription.qos = 1
    }
    return done(null, subscription);
  }

}
Enter fullscreen mode Exit fullscreen mode

To negate a subscription, set the subscription to null. Aedes ignores the negated subscription and the qos
in SubAck is set to 128 based
on MQTT 3.11 spec:

@Injectable()
export class TestService {

  @onAuthorizeSubscribe()
  onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
    done(null, subscription.topic === 'aaaa' ? null : sub)
  }

}
Enter fullscreen mode Exit fullscreen mode

Handler: published

  • packet: <aedes-packet> & [PUBLISH][PUBLISH]
  • client: <Client>
  • callback: <Function>

same as Event: publish, but provides a backpressure functionality. TLDR; If you are doing operations
on packets that MUST require finishing operations on a packet before handling the next one use this otherwise,
especially for long-running operations, you should use Event: publish instead.

@Injectable()
export class TestService {

  @onPublish()
  OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
    console.log("Function: @OnPublish()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Events

Name Emitted When
Client client registers itself to server
Client Ready client has received all its offline messages and be initialized.
Client Disconnect client disconnects.
Client Error an error occurs.
Connection Error an error occurs.
Keep Alive Timeout timeout happes in the client keepalive.
Publish servers delivers the packet to subscribed client.
Ack packet successfully delivered to the client.
Subscribe client successfully subscribe the subscriptions in server.
Unsubscribe client successfully unsubscribe the subscriptions in server.
Connack Sent server sends an acknowledge to client.
Closed server is closed.

Event: client

Emitted when the client registers itself to server. The client is not ready yet.
Its connecting state equals to true.

Server publishes a SYS topic $SYS/<aedes.id>/new/clients to inform it registers the client into its registration
pool. client.id is the payload.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClient()
  OnNewClient(@Client() client) {
    console.log("Function: @onClient()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: clientReady

Emitted when the client has received all its offline messages and be initialized.
The client connected state equals to true and is ready for processing incoming
messages.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClientReady()
  async onClientReady(@Client() client) {
    console.log("Function: @onClientReady()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: clientDisconnect

Emitted when a client disconnects.

Server publishes a SYS topic $SYS/<aedes.id>/disconnect/clients to inform it deregisters the client. client.id is
the payload.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClientDisconnect()
  OnClientDisconnect(@Client() client) {
    console.log("Function: @OnClientDisconnect()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: clientError

Emitted when an error occurs.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClientError()
  OnClientError(@Client() client, @Error() error) {
    console.log("Function: @onClientError()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: connectionError

Emitted when an error occurs. Unlike clientError it raises only when client is uninitialized.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onConnectionError()
  OnConnectionError(@Client() client, @Error() error) {
    console.log("Function: @OnConnectionError()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: keepaliveTimeout

Emitted when timeout happes in the client keepalive.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onKeepLiveTimeout()
  onKeepLiveTimeout(@Client() client) {
    console.log("Function: @onKeepLiveTimeout()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: publish

  • packet <aedes-packet> & [PUBLISH][PUBLISH]
  • client <Client> | null

Emitted when servers delivers the packet to subscribed client. If there are no clients subscribed to the packet
topic, server still publish the packet and emit the event. client is null when packet is an internal message
like aedes heartbeat message and LWT.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onPublish()
  OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
    console.log("Function: @OnPublish()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: ack

  • packet <object> [PUBLISH][PUBLISH] for QoS 1, [PUBREL][PUBREL] for QoS 2
  • client <Client>

Emitted an QoS 1 or 2 acknowledgement when the packet successfully delivered to the client.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }


  @onAck()
  onAck(@Client() client, @Packet() packet) {
    console.log("Function: @onAck()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: subscribe

  • subscriptions <object>
  • client <Client>

Emitted when client successfully subscribe the subscriptions in server.

subscriptions is an array of { topic: topic, qos: qos }. The array excludes duplicated topics and includes negated
subscriptions where qos equals to 128. See more
on authorizeSubscribe

Server publishes a SYS topic $SYS/<aedes.id>/new/subscribers to inform a client successfully subscribed to one or more
topics. The payload is a JSON that has clientId and subs props, subs equals to subscriptions array.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onSubscribe()
  OnSubscribe(@Subscription() subscription, @Client() client) {
    console.log("Function: @OnSubscribe()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: unsubscribe

  • unsubscriptions Array<string>
  • client <Client>

Emitted when client successfully unsubscribe the subscriptions in server.

unsubscriptions are an array of unsubscribed topics.

Server publishes a SYS topic $SYS/<aedes.id>/new/unsubscribers to inform a client successfully unsubscribed to one or
more topics. The payload is a JSON that has clientId and subs props, subs equals to unsubscriptions array.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onUnsubscribe()
  OnUnsubscribe(@Subscription() subscription, @Client() client) {
    console.log("Function: @OnUnsubscribe()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: connackSent

  • packet <object> [CONNACK][CONNACK]
  • client <Client>

Emitted when server sends an acknowledge to client. Please refer to the MQTT specification for the explanation of
returnCode object property in CONNACK.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onConnackSent()
  onConnackSent(@Client() client, @Packet() packet) {
    console.log("Function: @onConnackSent()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Event: closed

Emitted when server is closed.

@Injectable()
export class TestService {

  constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
  }

  @onClosed()
  onClosed(@Client() client, @Packet() packet) {
    console.log("Function: @onClosed()");
  }

}
Enter fullscreen mode Exit fullscreen mode

Methods

Method Emitted When
Publish Directly deliver packet on behalf of server to subscribed clients.
Close Close aedes server and disconnects all clients.

Method: Publish

  • packet <object> [PUBLISH][PUBLISH]

Directly deliver packet on behalf of server to subscribed clients.
Bypass authorizePublish.

callback will be invoked with error arugments after finish.

@Injectable()
export class TestService {

  //inject Pigeon Service
  constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
  }

  @onPublish()
  async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {

    //use this method to publish
    await this.pigeonService.publish({
      topic: "test2", qos: 0, cmd: "publish", payload: "", dup: false, retain: false
    });

  }

}
Enter fullscreen mode Exit fullscreen mode

Method: Close

  • callback: <Function>

Close aedes server and disconnects all clients.

callback will be invoked when server is closed.

@Injectable()
export class TestService {

  //inject Pigeon Service
  constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
  }

  @onPublish()
  async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {

    //use this method to publish
    await this.pigeonService.close();

  }

}
Enter fullscreen mode Exit fullscreen mode

Method: Get Broker Instance

get broker instance

@Injectable()
export class TestService {

  //inject Pigeon Service
  constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
  }

  @onPublish()
  async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {

    await this.pigeonService.getBrokerInstance();

  }

}
Enter fullscreen mode Exit fullscreen mode

Packets

This section describes the format of all packets

Packet -
Connect ---
Connack ---
Subscribe ---
Suback ---
Unsubscribe ---
Unsuback ---
Publish ---
Puback ---
Pubrec ---
Pubrel ---
Pubcomp ---
Pingreq ---
Pingresp ---
Disconnect ---

Packet: Connect

{
  cmd: 'connect',
  protocolId: 'MQTT', // Or 'MQIsdp' in MQTT 3.1 and 5.0
  protocolVersion: 4, // Or 3 in MQTT 3.1, or 5 in MQTT 5.0
  clean: true, // Can also be false
  clientId: 'my-device',
  keepalive: 0, // Seconds which can be any positive number, with 0 as the default setting
  username: 'matteo',
  password: Buffer.from('collina'), // Passwords are buffers
  will: {
    topic: 'mydevice/status',
    payload: Buffer.from('dead'), // Payloads are buffers
    properties: { // MQTT 5.0
      willDelayInterval: 1234,
      payloadFormatIndicator: false,
      messageExpiryInterval: 4321,
      contentType: 'test',
      responseTopic: 'topic',
      correlationData: Buffer.from([1, 2, 3, 4]),
      userProperties: {
        'test': 'test'
      }
    }
  },
  properties: { // MQTT 5.0 properties
      sessionExpiryInterval: 1234,
      receiveMaximum: 432,
      maximumPacketSize: 100,
      topicAliasMaximum: 456,
      requestResponseInformation: true,
      requestProblemInformation: true,
      userProperties: {
        'test': 'test'
      },
      authenticationMethod: 'test',
      authenticationData: Buffer.from([1, 2, 3, 4])
  }
}
Enter fullscreen mode Exit fullscreen mode

If password or will.payload are passed as strings, they will automatically be converted into a Buffer.

Packet: Connack

{
  cmd: 'connack',
  returnCode: 0, // Or whatever else you see fit MQTT < 5.0
  sessionPresent: false, // Can also be true.
  reasonCode: 0, // reason code MQTT 5.0
  properties: { // MQTT 5.0 properties
      sessionExpiryInterval: 1234,
      receiveMaximum: 432,
      maximumQoS: 2,
      retainAvailable: true,
      maximumPacketSize: 100,
      assignedClientIdentifier: 'test',
      topicAliasMaximum: 456,
      reasonString: 'test',
      userProperties: {
        'test': 'test'
      },
      wildcardSubscriptionAvailable: true,
      subscriptionIdentifiersAvailable: true,
      sharedSubscriptionAvailable: false,
      serverKeepAlive: 1234,
      responseInformation: 'test',
      serverReference: 'test',
      authenticationMethod: 'test',
      authenticationData: Buffer.from([1, 2, 3, 4])
  }
}
Enter fullscreen mode Exit fullscreen mode

Packet: Subscribe

{
  cmd: 'subscribe',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    subscriptionIdentifier: 145,
    userProperties: {
      test: 'test'
    }
  }
  subscriptions: [{
    topic: 'test',
    qos: 0,
    nl: false, // no Local MQTT 5.0 flag
    rap: true, // Retain as Published MQTT 5.0 flag
    rh: 1 // Retain Handling MQTT 5.0
  }]
}
Enter fullscreen mode Exit fullscreen mode

Packet: Suback

{
  cmd: 'suback',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    reasonString: 'test',
    userProperties: {
      'test': 'test'
    }
  }
  granted: [0, 1, 2, 128]
}
Enter fullscreen mode Exit fullscreen mode

Packet: Unsubscribe

{
  cmd: 'unsubscribe',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    userProperties: {
      'test': 'test'
    }
  }
  unsubscriptions: [
    'test',
    'a/topic'
  ]
}
Enter fullscreen mode Exit fullscreen mode

Packet: Unsuback

{
  cmd: 'unsuback',
  messageId: 42,
  properties: { // MQTT 5.0 properties
    reasonString: 'test',
    userProperties: {
      'test': 'test'
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Packet: Publish

{
  cmd: 'publish',
          messageId: 42,
          qos: 2,
          dup: false,
          topic: 'test',
          payload: Buffer.from('test'),
          retain: false,
          properties: { // optional properties MQTT 5.0
    payloadFormatIndicator: true,
            messageExpiryInterval: 4321,
            topicAlias: 100,
            responseTopic: 'topic',
            correlationData: Buffer.from([1, 2, 3, 4]),
            userProperties: {
      'test': 'test'
    },
    subscriptionIdentifier: 120, // can be an Array in message from broker, if message included in few another subscriptions
            contentType: 'test'
  }
}
Enter fullscreen mode Exit fullscreen mode

Packet: Puback

{
  cmd: 'puback',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // MQTT 5.0 properties
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Packet: Pubrec

{
  cmd: 'pubrec',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // properties MQTT 5.0
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Packet: Pubrel

{
  cmd: 'pubrel',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // properties MQTT 5.0
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Packet: Pubcomp

{
  cmd: 'pubcomp',
          messageId: 42,
          reasonCode: 16, // only for MQTT 5.0
          properties: { // properties MQTT 5.0
    reasonString: 'test',
            userProperties: {
      'test': 'test'
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Packet: Pingreq

{
  cmd: 'pingreq'
}
Enter fullscreen mode Exit fullscreen mode

Packet: Pingresp

{
  cmd: 'pingresp'
}
Enter fullscreen mode Exit fullscreen mode

Packet: Disconnect

{
  cmd: 'disconnect',
          reasonCode: 0, // MQTT 5.0 code
          properties: { // properties MQTT 5.0
    sessionExpiryInterval: 145,
            reasonString: 'test',
            userProperties: {
      'test': 'test'
    },
    serverReference: 'test'
  }
}
Enter fullscreen mode Exit fullscreen mode

Middleware Plugins

Persistence

MQEmitter

Dependencies


Stay in touch


Top comments (1)

Collapse
 
emreaka profile image
Emre AKA

Hello, I don't understand that "A communication system built on MQTT consists of the publishing server, a broker and one or more clients." What is the broker and publishing server? Could you help me, please?