DEV Community

Stefanos Kouroupis
Stefanos Kouroupis

Posted on • Updated on

Creating a simple queue messaging application over HTTP using NodeJs

There are literally a million similar applications out there that are highly performant and scalable. This one is not one of them. This is mainly to get a rough idea on how we can design such a system.

Like the title is stating I will use NodeJs and of course Typescript (I can't be bothered writing anything directly to javascript nowadays).

Requirements

  • create a queue
  • queue a message
  • store the message both on disk (persistence) and memory.
  • de-queue a message
  • write an e2e test to see how much data it can handle (try and break it)

Sounds easy and it definitely is, with persistence being a minor exception.

Since this is simple tutorial and not a production grade application, I ll keep things down to a minimal (probably).

To make it a bit more challenging I will avoid completely any dependencies. So our package.json will only include typescript and the necessary @types/node that always make our life easier.

From this tutorial I hope that someone would gain the following knowledge.

  • Basic understanding of creating a http server
  • How Node interacts with the filesystem
  • How Queues work (which is something that I hope everyone already knows about)

Setting up the project

We will start my creating the almost most minimal web service that someone can create with node.

(async () => {
    const server = await http.createServer(endpoint);
    server.listen(3000);
})();

function endpoint(req: http.IncomingMessage,res: http.ServerResponse) {
    res.end('OK');
}

If we run this app (tsc to build and node index.js to run) we will observe immediately that we created a web service (on port 3000) that responds OK no matter how we hit it. 🎉

I don’t personally think anything in this code needs explaining but why not so:

  • we created an async IIFE (Immediately Invoked Function Expression) that gets immediately invoked as soon as the application starts. And to make matters a bit more clear, we created a function named endpoint that responds ok and its used by http.createServer.

The next step is to make the endpoint function to accept only GET requests and

  • If the request has a name and message as a url parameter then it will create a queue with that name and push that message.

  • If the request has only name as a url parameter then we will look for a queue with that name and retrieve the oldest message.

let queues: QueuePersistence = new QueuePersistence();
function endpoint(req: http.IncomingMessage,res: http.ServerResponse) {
  if (req.method === 'GET' && req.url) {
    const queryData = (url.parse(req.url, true).query) as unknown as Incoming;
    if (queryData.name && queryData.message) {
      queues.pushMessageToQueue(queryData.name, queryData.message);
      res.end('OK');
    } else if (queryData.name) {
      const message = queues.getMessageFromQueue(queryData.name);
      res.end(message);
    } else {
      res.end('query parameters are not correct');
    }
  }
}

QueuePersistence is the place where all the magic happens and Incoming is an interface of the allowed query parameters.

export interface Incoming {
    name: string;
    message: string;
}

I am going to create another interface called QueueStructure

export interface QueueStructure {
    name: string;
    messages: string[];
}

and finally our main class

export class QueuePersistence {
  public pushMessageToQueue(name: string, message: string) {
  }
  public getMessageFromQueue(name: string) {
  }
}

Functionality

To begin with I am going to write a helper function inside the QueuePersistence class that will accept a filename as its parameter. It will attempt to read that file and if it doesn’t exist it will create it and return its content.

private readFile(filename: string): Buffer | undefined {
  if (!fs.existsSync(filename)) {
    fs.writeFile(filename, '', (error) => {
      if (error) {
        console.log(error);
      }
    });
    return Buffer.from('');
  }
  try {
    return fs.readFileSync(filename);
  } catch (error) {
    console.log(error);
  }
}

Just a note here. Because I don’t want to overcomplicate this tutorial I am only using the Sync variations of the functions of the fs (file system) module.

Now I am going to add a constructor and two objects, and in the constructor I will call two functions.

private _queueDefinitions: string[] = [];
private _queuePersistance: QueueStructure[] = [];
constructor() {
    this.createQueueDefinitionArray();
    this.createQueuePersistance();
}
  • createQueueDefinitionArray will create the _queueDefinitions (object and file)(originally both will be empty). Queues will be created automatically when the queue doesn’t exist in the definition.
  • createQueuePersistance will create a file and an array entry for each queue in the _queueDefinitions Array.
private createQueueDefinitionArray() {
  console.log('...loading queue definition');
  const body = this.readFile('queues');
  if (body) {
    this.queueDefinition = body.toString('utf8').split('\r\n');
    console.log('...loading queue definition complete');
  } else {
    console.log('...loading queue definition failed');
    process.exit(2);
  }
}
private createQueuePersistance() {
  console.log('...loading queue persistance');
  if (this._queueDefinitions.length > 0) {
      this._queueDefinitions.forEach((def) => {
        const body = this.readFile(def);
        if (body) {
          this._queuePersistance.push({
             name: def,
             messages: body.toString('utf8').split('\r\n').reverse()
          });
        } else {
             console.log('...loading queue persistance failed');
             process.exit(2);
        }
     });
  }
  console.log('...loading queue persistance complete');
}

Now all the files have been created in our system so persistence is set up.

Instead of using the filesystem I could have used a file database/or any other type of database, but like I said in the beginning I wanted to completely avoid libraries. Plus its a good opportunity to learn something about filesystem operations. 🎉

Next is a function that will give us the queue object in _queuePersistance by name.

private getQueueByName(name: string): QueueStructure | undefined {
  let queue = this._queuePersistance.find(x => x.name === name);
  if (!queue) {
    const body = this.readFile(name);
    if (body) {
      queue = {
          name: name,
          messages: []
      };
      this._queuePersistance.push(queue);
      this.addToTop('queues', name);
    }
  }
  return queue
}

Just a simple find functionality and if the queue we are looking is not there create it and push it and return it.

Now there is a function there called addToTop. I will leave that function for last because it is in a way, the most complicated function in this application.

Its functionality is to add a new line in the beginning of the file.

Finally we got almost everything, the only things missing are:

  • pushMessageToQueue
  • getMessageFromQueue
  • the elusive addToTop function
public pushMessageToQueue(name: string, message: string) {
    const queue = this.getQueueByName(name);
    if (queue) {
        this.addToTop(name, message);
        queue.messages.push(message);
        console.log(queue.messages);
    }
}

At this point the following becomes apparent

  • new messages are added at the end of the message array
  • but they are also added in the beginning of the persistence file
public getMessageFromQueue(name: string) {
    const queue = this.getQueueByName(name);
    if (queue) {
        const message = queue.messages[0];
        const stat = fs.statSync(name);
        fs.truncateSync(name, stat.size - message.length - 2);
        const response = queue.messages.shift();
        console.log(`${response} was requested and removed`);
        return response;
    }
}

This might need a bit more explaining

  • we get the oldest message from the message array (queue.messages[0])
  • we get the stats of the persistence file
  • we truncate the file (meaning we are removing the last line of the file) and we do that by calculating where to crop the file, which is
SIZE_OF_FILE — MESSAGE_LENGTH — 2 // this 2 is because I am adding after each message \r\n
  • we shift the array (meaning removing the first item and re-indexing) and return the shift (which is the first element)

and finally

private addToTop(filename: string, message: string) {
  const fd = fs.openSync(filename, 'r+');
  const data = fs.readFileSync(filename);
  const buffer: Buffer = Buffer.from(`${message}\r\n`);
  fs.writeSync(fd, buffer, 0, buffer.length, 0);
  fs.writeSync(fd, data, 0, data.length, buffer.length);
  fs.closeSync(fd);
}

which basically does the following

  • opens the file and returns the fd (file descriptor, that’s a number basically)
  • we create a stream of all the data in the file.
  • we create a buffer with our new message
  • we write our buffer in the begging of the file
  • we append the rest of the stream after we wrote the buffer by offsetting by the buffer length
  • we close the file

and yes, I know, that adding anything in front of a file is never going to be efficient but it's easier to understand

Testing

I am going to just paste the whole test here. Personally I don’t feel it needs many comments or remarks a simple explanation should be more than enough.

Basically I fire a publisher and a subscriber with setInterval. I’ve set the values to as low as possible (by trying different values), any lower and it starts throwing errors non stop (because the event loop gets blocked).

I started from 500ms and went down to 15+ms. I wasn’t expecting to be able to handle nearly 120 requests per second. I am impressed.

import * as http from 'http';
(async () => {
    setInterval(() => {
        // create random string as a message
        const msg = Math.random().toString(36).substring(7);
        console.log(`publishing message ${msg}`);
        request(`/?name=primary&message=${msg}`);
    }, 15);
    setTimeout(() => {
        setInterval(async () => {
            const msg: any = await request(`/?name=primary`);
           console.log(`requested message ${msg.object}`);
        }, 20);
    }, 50);
})();
function request(url: string) {
  const options = {
      host: 'localhost',
      path: url,
      port: '3000',
      method: 'GET'
  };
return new Promise((resolve, reject) => {
    const request = http.request(options, (response) => {
      let str = '';
      response.on('data', (chunk) => {
          str += chunk;
      });
      response.on('end', () => {
          resolve({ request: response, object: str });
      });
      response.on('error', (error) => {
          reject(error);
      });
    });
    request.write('');
    request.end();
  });
}

I hope you enjoyed that as much as I did

Top comments (5)

Collapse
 
miwels profile image
miwels

Thank you for the tutorial. I always used prebuilt queue systems like Amazon's SQS but I find it very interesting to understand how these systems work internally.

Can you explain this line?

const queryData = (url.parse(req.url, true).query) as unknown as Incoming;

url is not defined (I thought that maybe you wanted to use the 'URL' class but the parse method doesn't exist). Also I don't really understand the notation "as unknown as Incoming"

Thanks

Collapse
 
elasticrash profile image
Stefanos Kouroupis • Edited

Sorry I didn't include the imports to save space. url is the node url module

import * as url from 'url';

I do have the code in a GitHub repo but I am not entirely sure it's identical

github.com/elasticrash/dpp-queue

As for the unknown...unknown is a type that can be assigned to something without type assertion...it's like saying I don't care what is it but I I want to assume is is Incoming

Collapse
 
miwels profile image
miwels

Many thanks, now is much more clear :)

Collapse
 
elasticrash profile image
Stefanos Kouroupis

No I am interested at Keyboards and hardware design.

Collapse
 
rdewolff profile image
Rom

Great post, thanks for sharing!