In our previous post, we explored how to efficiently download a large dataset from MongoDB. We utilised the built-in fs
module to create temporary files on the server, enabling us to handle substantial amounts of data. However, upon deploying our solution for testing, an unexpected issue arose during load testing.
Our tester discovered that when multiple requests were made to the download API within a short span of time, both requests received the same data, despite requesting different datasets. For instance, if User A requested data for January, and User B requested data for June, both users received data for January. This discrepancy indicated a critical flaw in our implementation: it was not processing requests sequentially; rather, it was executing them concurrently.
Instead of queuing requests and processing them one by one, our system was handling multiple requests simultaneously,
To address this issue and ensure the integrity of our data,
we need to implement a queuing mechanism that processes requests sequentially.
To implement the mentioned mechanism we used
First of all we need to set up a Redis server for that you install it locally or you can use a docker image and run it
locally
version: "3"
services:
redis:
image: redis:6.0.7
container_name: redis
restart: always
volumes:
- redis_volume_data:/data
ports:
- 6379:6379
redis_insight:
image: redislabs/redisinsight:latest
container_name: redis_insight
restart: always
ports:
- 8001:8001
volumes:
- redis_insight_volume_data:/db
volumes:
redis_volume_data:
redis_insight_volume_data:
After the server is running is successfully we will install
Bullmq
npm install bullmq
BullMQ is a lightweight, robust, and fast NodeJS library for creating background jobs and sending messages using queues. BullMQ is designed to be easy to use, but also powerful and highly configurable. It is backed by Redis, which makes it easy to scale horizontally and process jobs across multiple servers
By default it has a port 6379, so we first we need to make connection to this running server and it's simple with bullmq
const { Queue } = require("bullmq");
const connectionOpts = {
port: 6379,
host: "localhost", // for production it will be different
};
const downloadQueue = new Queue("download-example-queue", {
connection: connectionOpts,
});
you can define a queue name and this will create a queue for you, so we are using Redis as a message broker to handle job queues.
As i said before we can do many things with Bullmq you learn about some methods through this blog but we will most be doing two things in our implementation, Creating Jobs and Processing Jobs,
Creating Jobs: You can create jobs by pushing them into a queue using the add method. These jobs are stored in Redis until they can be processed.
Processing Jobs: Workers pull jobs from the queue and process them. The process method specifies what to do with each job.
const addToQueue = async (exampleData) => {
try {
const job = await downloadQueue.add("download-example-queue", reportData, {
removeOnComplete: true,
removeOnFail: true,
});
console.log("SUCCESS ADDED TO ID", job.id);
return job.id;
} catch (error) {
console.error("Error adding job to queue:", error);
throw error;
}
};
So here we have created a function that will take in exampleData and add that to our queue
after completion it will will removed
removeOnComplete: true,
removeOnFail: true,
Additionally, we have created two functions to pause the queue and resume the queue
const pauseQueue = async () => {
downloadQueue.pause();
};
const resumeQueue = async () => {
downloadQueue.resume();
};
module.exports = {
addToQueue,
pauseQueue,
resumeQueue,
};
We will be using the above functions in our main endpoint that we are hitting, which the place we are sending our request headers from the client
module.exports.downloadTransactionsServer = async (req, res) => {
async function queueFunction() {
try {
// Extract necessary request headers
const requestData = {
dateRange: req.headers.daterange,
client_id: req.headers.client_id,
created_by: req.headers.created_by,
};
// Add request data to the queue and get the job ID
const jobId = await addToQueue(requestData);
// Check if the response status code is 200
if (res.statusCode === 200) {
// Return the job ID
return jobId;
}
} catch (error) {
console.log(error, "ERROR");
}
}
// Call the queue function
await queueFunction();
// Return success response
return createSuccessResponse(
STATUS_CODES.ok,
"Download initiated"
);
};
import those addToQueue functions to your current file,
but we need to have a Worker,
Workers are the actual instances that perform some job based on the jobs that are added in the queue. A worker is equivalent to a "message" receiver in a traditional message queue. The worker's duty is to complete the job. If it succeeds, the job will be moved to the "completed" status. If the worker throws an exception during its processing, the job will automatically be moved to the "failed" status.
In microservices architecture, it's best practice to have one worker per service to ensure sequential job processing and prevent duplicates. However, scaling workers may be necessary for increased workload or fault tolerance, requiring careful management to maintain system integrity.
So, what was happening was that I had the worker created in a separate file. Due to that, multiple workers were created; however, as many requests as we made, that many workers were made. But it was working just fine; our local Redis server was able to handle it. However, in production, things were not working as expected because we had only one Redis server, and it was not creating multiple workers. Download requests would stop after coming to the queue, and they would be in a forever paused state. So, therefore, we initialised our worker in server.js.
const start = async () => {
try {
await fastify.listen({ port: process.env.PORT, host: "0.0.0.0" });
const myWorker = new Worker(
"download-example-queue",
async (job) => {
try {
const reportData = JSON.stringify(job.data);
await pauseQueue();
await generateReportAndSendMail(JSON.parse(reportData));
} catch (error) {
console.error("Error processing job:", error);
} finally {
await resumeQueue();
}
},
{
limiter: {
max: 1,
duration: 1000,
},
ttl: 600000,
connection: {
port: 6379,
host: "localhost",
},
}
);
myWorker.on("completed", (job) => {
console.log(job.id, "HAS been completed");
});
} catch (e) {
process.exit(1);
}
};
here imported our function generateReportAndSendMail
, the worker is configured to handle a maximum of one job concurrently and has a time limit of 1000 milliseconds for processing each job. It establishes a connection to a Redis server running on localhost with a TTL (time-to-live) of 600000 milliseconds. Furthermore, an event listener is implemented to log successful completion of jobs. Error handling mechanisms are incorporated to manage any potential issues during server startup or job processing. If errors occur, the process is terminated with an error code of 1 to signify the occurrence of an error.
And there you have it with that our queue is implemented and running successfully,
Thank you for reading and Follow me on LinkedIn if youβre interested in Web Development. π
Top comments (0)