Я использую модуль Nodejs cluster
для запуска нескольких рабочих.
Я создал базовую архитектуру, в которой будет один процесс MASTER, который в основном представляет собой экспресс-сервер, обрабатывающий несколько запросов, и основной задачей MASTER будет запись входящих данных из запросов в экземпляр REDIS. Другие работники (numOfCPU - 1) не будут хозяевами, то есть они не будут обрабатывать какие-либо запросы, поскольку являются просто потребителями. У меня есть две функции, а именно ABC и DEF. Я распределял неосновных работников равномерно по функциям, назначая им тип.
Например: на 8-ядерном компьютере:
1 будет запросом обработки экземпляра MASTER через экспресс-сервер
Оставшееся количество (8 - 1 = 7) будет распределено равномерно. 4 до функции: ABD и 3 до: DEF.
Работники, не являющиеся хозяевами, в основном являются потребителями, т. Е. Они читают данные из REDIS, в которые только МАСТЕР-работник может записывать данные.
Вот код для того же самого:
if (cluster.isMaster) {
// Fork workers.
for (let i = 0; i < numCPUs - 1; i++) {
ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
}
cluster.on('exit', function(worker) {
console.log(`Worker ${worker.process.pid}::type(${worker.type}) died`);
ClusteringUtil.removeWorkerFromList(worker.type);
ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
});
// Start consuming on server-start
ABCConsumer.start();
DEFConsumer.start();
console.log(`Master running with process-id: ${process.pid}`);
} else {
console.log('CLUSTER type', cluster.worker.process.env.type, 'running on', process.pid);
if (
cluster.worker.process.env &&
cluster.worker.process.env.type &&
cluster.worker.process.env.type === ServerTypeEnum.EXPRESS
) {
// worker for handling requests
app.use(express.json());
...
}
{
Все отлично работает, кроме потребителей, читающих из REDIS.
Поскольку есть несколько потребителей определенной функции, каждый читает одно и то же сообщение и начинает обрабатывать индивидуально, что мне не нужно. Если есть 4 потребителя, 1 помечен как занятый и не может потреблять, пока не освободится, 3 доступны Как только сообщение об этой конкретной функции записано в REDIS MASTER, проблема заключается в том, что все 3 доступных потребителя этой функции начинают потреблять. Это означает, что для одного сообщения работа выполняется на основе количества доступных потребителей.
const stringifedData = JSON.stringify(req.body);
const key = uuidv1();
const asyncHsetRes = await asyncHset(type, key, stringifedData);
if (asyncHsetRes) {
await asyncRpush(FeatureKeyEnum.REDIS.ABC_MESSAGE_QUEUE, key);
res.send({ status: 'success', message: 'Added to processing queue' });
} else {
res.send({ error: 'failure', message: 'Something went wrong in adding to queue' });
}
Потребитель просто принимает сообщения и останавливается, когда он занят
module.exports.startHeartbeat = startHeartbeat = async function(config = {}) {
if (!config || !config.type || !config.listKey) {
return;
}
heartbeatIntervalObj[config.type] = setInterval(async () => {
await asyncLindex(config.listKey, -1).then(async res => {
if (res) {
await getFreeWorkerAndDoJob(res, config);
stopHeartbeat(config);
}
});
}, HEARTBEAT_INTERVAL);
};
В идеале, сообщение должно быть прочитано только одним потребителем этой конкретной функции. После потребления он помечается как занятый, поэтому он не будет потребляться до тех пор, пока не освободится (я обработал это). Следующее сообщение может быть обработано только одним потребителем из других доступных.
Пожалуйста, помогите мне решить эту проблему. Опять же, я хочу, чтобы одно сообщение было прочитано только одним бесплатным потребителем, а остальные бесплатные пользователи должны ждать нового сообщения.
Спасибо