DEV Community

Umakanthan D
Umakanthan D

Posted on

0mq - The little bundle of messaging joy

0mq (https://github.com/zeromq/zeromq.js) - "A thing of beauty is a joy forever". Or maybe one could go like "Small is Beautiful".

Nothing more joyful than re-looking at something you thought was perhaps too small and finding that it just rocks!!! A perfect way to start off the year!

I was looking for micro-designs where I could send out millions of messages and have a set of consumers process them. No message should ever get lost. Plus not having to rely on anything where zookeeper too comes into the picture :)

There were a few criteria that I had in mind:

  1. If the consumer is not available the producer should wait.
  2. If the consumer takes more time to process the messages than the time it takes for the producer to produce them, then the producer should queue and wait, sending them in a way that the consumer is able to process them.
  3. The producer should be able to exit after producing the messages and the consumer should be able to process them.
  4. Either the producer or consumer can re-start without losing any messages.

0mq was able to achieve all of the above. Here is a quick and dirty producer and consumer code for you to check out. It is in JavaScript and you will need 'nodejs' to execute it.

The code here is for the push/pull model. 0mq also supports other models such as pub/sub and request/reply models.

producer.js

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Push

  await sock.bind("tcp://127.0.0.1:3000")
  console.log("Producer bound to port 3000")
  let ctr = 0
  let limit = 1000 * 1000
  while (true) {
    await sock.send("do task."+ctr)
    ctr++
    if (ctr % 10000 === 1) console.log(ctr)
    if (ctr > limit) break
  }
  await sock.send("end")
  ctr++
  console.log('sent',ctr)
}

run()
Enter fullscreen mode Exit fullscreen mode

consumer.js

const zmq = require("zeromq")

async function run() {
  const sock = new zmq.Pull

  sock.connect("tcp://127.0.0.1:3000")
  console.log("Worker connected to port 3000")
  let ctr = 0
  for await (const [msg] of sock) {
    ctr++
    const msg1 = msg.toString()

    if (ctr % 1000 === 1) {
      console.log("%d - message: %s", ctr, msg1)
      await new Promise(resolve => setTimeout(resolve, 100))
    }
    if (msg1 === 'end') break
  }
  console.log('processed = ', ctr)
}

run()
Enter fullscreen mode Exit fullscreen mode

As for installation, you don't need any server. 0mq is basically a wrapper over the OS socket libraries. For nodejs all that you need is this one command.

$ npm install zeromq@6.0.0-beta.6
Enter fullscreen mode Exit fullscreen mode

Hats off to the 0mq folks and the brilliant thought and engineering behind this small bundle of messaging joy!

At 1 million small messages, the test may be simplistic, but the initial results show a lot of promise.

Top comments (0)