At current days web services is grown up, mobile application has a thousands of users. Backend is scale up and contain few services that working are parallel. But sometimes parallel instances are not solved a business issues. For example you have business process where is important apply requests one by one. Some item is created, after that, it is updating, and after that, it is finished or cancelled. Your instances could have different performance, or the logic of some methods is more slowly. Because of that the event for cancelling could be launch before the creation of item.
const instanceCount = 3; // count of instances
let instanceFinished = 0; // variable to save count of the finished workers
const storage = {}; // fake storage
// example of one service
class Queue {
constructor() {
this.list = []; // list of events in queue
this.inProcess = false; // status of worker
}
// method to add event in queue
push({ item, worker }) {
this.list.push({ item, worker });
this.process();
}
// method for launch every event after finish previous
async process() {
if (this.inProcess) {
return;
}
if (! this.list.length) {
instanceFinished += 1;
if (instanceFinished === instanceCount) {
console.log('storage', storage);
}
return;
}
this.inProcess = true;
const { item, worker } = this.list.shift();
try {
await worker(item);
} catch(e) {
console.log(e);
} finally {
this.inProcess = false;
this.process();
}
}
}
// example of logic with different time of working
class UseCase {
/**
* Method to save the item in storage
* @param {string} key
*/
async create({ key }) {
await new Promise((res, rej) => {
setTimeout(() => {
if (storage[key]) {
return rej(`Item ${key} already exists`);
}
storage[key] = { status: 'created' };
console.log(`Item ${key} created ${Date().toString()}`);
res();
}, 300);
});
}
/**
* Method to update the item
* @param {string} key
*/
async update({ key }) {
await new Promise((res, rej) => {
setTimeout(() => {
if (! storage[key]) {
return rej(`Item ${key} is not exists`);
}
storage[key].status = 'updated';
console.log(`Item ${key} updated ${Date().toString()}`);
res();
}, 200);
});
}
/**
* Method to cancel the item
* @param {string} key
*/
async cancel({ key }) {
await new Promise((res, rej) => {
setTimeout(() => {
if (! storage[key]) {
return rej(`Item ${key} is not exists`);
}
storage[key].status = 'cancelled';
console.log(`Item ${key} cancelled ${Date().toString()}`);
res();
}, 100);
});
}
}
After launching this service you could see that workers try to update or cancel items that was not created because the logic of creation is slower than the logic of updating and cancelling.
const serivce = async () => {
const useCase = new UseCase();
const queues = [];
for (let i = 0; i < instanceCount; i++) {
queues.push(new Queue());
}
for (let i = 1; i < 10; i++) {
const item = { key: i.toString() };
queues[0].push({ item, worker: useCase.create });
queues[1].push({ item, worker: useCase.update });
queues[2].push({ item, worker: useCase.cancel });
}
}
serivce();
But if instances will working only with their part of items and every event of item will launch after previous event all be fine.
To separate items by partitions we could use a hash. We should convert key of item into hash number. After take the remaining from division as id of the instance.
/**
* function for convert string to 32bit integer
* Source: http://werxltd.com/wp/2010/05/13/javascript-implementation-of-javas-string-hashcode-method/
* @param {string} str - string for hashing
*/
const hashCode = (str) => {
var hash = 0;
if (str.length === 0) {
return hash;
}
for (i = 0; i < str.length; i++) {
char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash;
}
return hash;
}
const serviceWithHashing = async () => {
const useCase = new UseCase();
const queues = [];
for (let i = 0; i < instanceCount; i++) {
queues.push(new Queue());
}
for (let i = 1; i < 10; i++) {
const item = { key: i.toString() };
const queue = queues[hashCode(i.toString()) % instanceCount];
queue.push({ item, worker: useCase.create });
queue.push({ item, worker: useCase.update });
queue.push({ item, worker: useCase.cancel });
}
}
serviceWithHashing();
This method is not for big distributed systems where count of instances changed dynamically, but could be helpful for launching of few parallel jobs or process.
Top comments (0)