Чтение архитектуры кластера Nodejs из одного экземпляра REDIS - PullRequest
1 голос
/ 18 мая 2019

Я использую модуль Nodejs cluster для запуска нескольких рабочих. Я создал базовую архитектуру, в которой будет один процесс MASTER, который в основном представляет собой экспресс-сервер, обрабатывающий несколько запросов, и основной задачей MASTER будет запись входящих данных из запросов в экземпляр REDIS. Другие работники (numOfCPU - 1) не будут хозяевами, то есть они не будут обрабатывать какие-либо запросы, поскольку являются просто потребителями. У меня есть две функции, а именно ABC и DEF. Я распределял неосновных работников равномерно по функциям, назначая им тип.

Например: на 8-ядерном компьютере:

1 будет запросом обработки экземпляра MASTER через экспресс-сервер

Оставшееся количество (8 - 1 = 7) будет распределено равномерно. 4 до функции: ABD и 3 до: DEF.

Работники, не являющиеся хозяевами, в основном являются потребителями, т. Е. Они читают данные из REDIS, в которые только МАСТЕР-работник может записывать данные.

Вот код для того же самого:

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs - 1; i++) {
    ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
  }

  cluster.on('exit', function(worker) {
    console.log(`Worker ${worker.process.pid}::type(${worker.type}) died`);
    ClusteringUtil.removeWorkerFromList(worker.type);
    ClusteringUtil.forkNewClusterWithAutoTypeBalancing();
  });

  // Start consuming on server-start
  ABCConsumer.start();
  DEFConsumer.start();

  console.log(`Master running with process-id: ${process.pid}`);
} else {
  console.log('CLUSTER  type', cluster.worker.process.env.type, 'running on', process.pid);
  if (
    cluster.worker.process.env &&
    cluster.worker.process.env.type &&
    cluster.worker.process.env.type === ServerTypeEnum.EXPRESS
  ) {
    // worker for handling requests
    app.use(express.json());
    ...
  }
{

Все отлично работает, кроме потребителей, читающих из REDIS. Поскольку есть несколько потребителей определенной функции, каждый читает одно и то же сообщение и начинает обрабатывать индивидуально, что мне не нужно. Если есть 4 потребителя, 1 помечен как занятый и не может потреблять, пока не освободится, 3 доступны Как только сообщение об этой конкретной функции записано в REDIS MASTER, проблема заключается в том, что все 3 доступных потребителя этой функции начинают потреблять. Это означает, что для одного сообщения работа выполняется на основе количества доступных потребителей.

const stringifedData = JSON.stringify(req.body);
  const key = uuidv1();

  const asyncHsetRes = await asyncHset(type, key, stringifedData);

  if (asyncHsetRes) {
    await asyncRpush(FeatureKeyEnum.REDIS.ABC_MESSAGE_QUEUE, key);
    res.send({ status: 'success', message: 'Added to processing queue' });
  } else {
    res.send({ error: 'failure', message: 'Something went wrong in adding to queue' });
  }

Потребитель просто принимает сообщения и останавливается, когда он занят

module.exports.startHeartbeat = startHeartbeat = async function(config = {}) {
  if (!config || !config.type || !config.listKey) {
    return;
  }

  heartbeatIntervalObj[config.type] = setInterval(async () => {
    await asyncLindex(config.listKey, -1).then(async res => {
      if (res) {
        await getFreeWorkerAndDoJob(res, config);
        stopHeartbeat(config);
      }
    });
  }, HEARTBEAT_INTERVAL);
};

В идеале, сообщение должно быть прочитано только одним потребителем этой конкретной функции. После потребления он помечается как занятый, поэтому он не будет потребляться до тех пор, пока не освободится (я обработал это). Следующее сообщение может быть обработано только одним потребителем из других доступных.

Пожалуйста, помогите мне решить эту проблему. Опять же, я хочу, чтобы одно сообщение было прочитано только одним бесплатным потребителем, а остальные бесплатные пользователи должны ждать нового сообщения.

Спасибо

1 Ответ

1 голос
/ 18 мая 2019

Я не уверен, что полностью понимаю вашу архитектуру потребителей Redis, но мне кажется, что она противоречит сценарию использования самого Redis. То, чего вы пытаетесь достичь, - это, по сути, обмен сообщениями на основе очереди со способностью фиксировать сообщение, как только оно будет сделано.

Redis имеет свою собственную функцию паба / саба, но она построена на принципе огня и забывает. Он не различает потребителей - он просто отправляет данные всем им, предполагая, что их логика обрабатывает входящие данные.

Я рекомендую вам использовать серверы очередей, такие как RabbitMQ. Вы можете достичь своей цели с помощью некоторых функций, которые поддерживает AMQP 0-9-1: подтверждение сообщения, счетчик предварительной выборки и т. Д. Вы можете настроить свой кластер с очень гибкими конфигурациями, такими как хорошо, я хочу иметь X потребителей, и каждый может обрабатывать 1 уникальное (!) Сообщение за раз, и они будут получать новые только после того, как они позволят серверу (rabbitmq). ) знают, что они успешно завершили обработку сообщений . Это очень настраиваемый и надежный.

Однако, если вы хотите работать без сервера с какой-либо полностью управляемой службой, чтобы не выделять виртуальные машины или что-либо еще для запуска сервера очередей сообщений по вашему выбору, вы можете использовать AWS SQS. У него довольно похожий API и список возможностей.

Надеюсь, это поможет!

...