DEV Community

Cover image for How to Handle Socket.io Events in Sequence Using a Queue: a Simple Guide
Ibrahim Ayuba
Ibrahim Ayuba

Posted on • Updated on

How to Handle Socket.io Events in Sequence Using a Queue: a Simple Guide

All incoming socketio events will start processing in the order they arrive. Those with synchronous handler functions are guaranteed to finish processing in that same order. However, that might not be the case for events with asynchronous handlers, which can finish processing in any order. This behavior makes our code faster, but in some cases, might not be what we want.

In this short post, you’ll learn how to make events with asynchronous tasks start and finish processing in the order they arrive at the server. To achieve this, we’ll create a simple custom queuing system.

Let’s dive in.

The Problem

Assume you have a situation where two clients are connected to your socketio server. You want a behavior where the first to send an event creates a new row in a database table and the second updates that same row. Your code may look something like this:

io.on("connection", (socket) => {
    console.log("A user connected");

    socket.on("SOME_EVENT", async(param) => {
            //check whether the column already exists:
            const column = await db.select...

            //if column exists, update it:
            if(column){
                await db.update...
            }

            //else, create one
            await db.insert...
    })
}
Enter fullscreen mode Exit fullscreen mode

Now, the problem is that if the two clients emit “SOME_EVENT” simultaneously, there’s a chance they’ll both create a new row in the database, which is not what we want.

Solving the Problem with a Queuing System

Instead of allowing socketio to execute the handler functions, we’ll intercept them and decide when they get executed. When we intercept the handlers, we’ll send them to our queuing system, responsible for implementing them in order.

The system will have two main components — a queue and an event loop.

What is a Queue?

In computer science, a queue is a data structure that enables us to store and manage data sequentially. To achieve this, the structure only allows data to be added at one end (the tail) and exit at the other end (the head). This characteristic is popularly called FIFO, meaning first in first out.

Queue is an abstract data type (ADT). Like other ADTs, many programming languages, including Javascript, don’t have it by default. In this post, we’ll model our queue using the Javascript array’s unshift and pop methods.

What is an Event Loop?

In broad terms, an event loop is a construct that runs at intervals and conditionally executes tasks. We’ll use a setInterval in our case, which will constantly check if the queue contains pending functions, and will call the next function only when the previous one is complete.

Implementing the Queuing System

class QueuingSystem {
    //We're making the queue private so that it can only be 
    //modified only within the class
    #queue = [];
    constructor(interval) {
        this.interval = interval;
    }

    //returns last item of an array:
    lastItem(arr) {
        return arr[arr.length - 1];
    }

    //returns true if array is empty:
    isEmpty(arr) {
        return arr.length == 0;
    }

    //responsible for executing the function at the head of the queue:
    async run(arr) {
        //removing the function at the head of the queue:
        const func = arr.pop();

        //adding "false" placeholder at the head to indicate that 
        //a function is being executed:
        arr.push(false);

        //executing the function:
        await func();

        //removing the false placeholder at the head to indicate that 
        //the run function is ready to execute the next function: 
        arr.pop();
    }

    //function to add to the tail end of the queue:
    addToQueue(func) {
        this.#queue.unshift(func);
    }

    //function to start the event loop:
    start() {
        return setInterval(() => {
            //checking if the run method is free by checking if the item at the head is false.
            //and checking if the array isn't empty
            if (this.lastItem(this.#queue) !== false 
            && !this.isEmpty(this.#queue)) {
                this.run(this.#queue);
            }
        }, this.interval);
    }

    //stopping the event loop if no longer needed:
    stop() {
        clearInterval(this.start());
    }
}
Enter fullscreen mode Exit fullscreen mode

Our message queue class is now ready to receive and execute functions sequentially.

Using our System in Socket.io

With our queuing system in place, let’s use it in our code:

//Create a socketQueue that loops after every half of a second:
const socketQueue = new QueuingSystem(500)

io.on("connection", (socket) => {
    console.log("A user connected");

    const handler = async(param) => {
            //check whether the column already exists:
            const column = await db.select...

            //if column exists, update it:
            if(column){
                await db.update...
            }

            //else, create one
            await db.insert...
    }

    socket.on("SOME_EVENT", (param) => {
        //Add the handler function to socketQueue queue
        socketQueue.addToQueue(hanlder.bind(null, param))
    })
}

server.listen(PORT, () => {
    //start the queuing system:
    socketQueue.start();
    console.log("App listening on port", PORT);
});
Enter fullscreen mode Exit fullscreen mode

The bind method in Javascript is used to attach functions, together with their parameters, to objects, but not call them. In our case, we’re not attaching the function to any object, that’s why the first argument is null.

Conclusion

The message queue class we created can help us execute events with asynchronous handlers sequentially. If you need a more complex queuing system, check out BullMQ or other robust solutions. Happy coding!

Top comments (0)