Не обрабатывать следующее задание, пока не будет завершено предыдущее (Redis?) - PullRequest
1 голос
/ 10 июля 2020

По сути, каждый из клиентов --- с которым связан clientId --- может отправлять sh сообщений, а важно, чтобы второе сообщение от того же клиента не обрабатывалось до тех пор, пока первый завершает обработку (даже если клиент может отправлять несколько сообщений подряд, и они упорядочены, и несколько клиентов, отправляющих сообщения, в идеале не должны мешать друг другу). И, что важно, задание не должно обрабатываться дважды.

Я подумал, что с помощью Redis я смогу исправить эту проблему, я начал с быстрого прототипирования с использованием библиотеки быков, но я явно не делаю этого. хорошо, я надеялся, что кто-то знает, как действовать.

Это то, что я пробовал до сих пор:

  1. Создавайте задания и добавляйте их в очередь с тем же именем для одного процесса, используя clientId в качестве имени задания.
  2. Использовать задания, ожидая больших случайных значений случайного времени в двух отдельных процессах.
  3. Я попытался добавить блокировку по умолчанию, предоставляемую библиотекой, которой я являюсь с использованием (bull), но он блокируется на jobId, который уникален для каждого задания, а не на clientId.

Что я бы хотел:

  • Один из потребителей не может взять задание от того же clientId, пока предыдущий не завершит его обработку.
  • Однако они должны иметь возможность получать элементы из разных clientId параллельно без проблема (асинхронный ly). (Я еще не зашел так далеко, сейчас я имею дело только с одним clientId)

Что я получаю:

  • Оба потребителя потребляют столько же товаров как они могут из очереди, не дожидаясь завершения предыдущего элемента для clientId.

Подходит ли Redis для этого задания?

Пример кода

// ./create.ts
import { queue, randomWait } from './setup';

const MIN_WAIT = 300;
const MAX_WAIT = 1500;
async function createJobs(n = 10): Promise<void> {
  await randomWait(MIN_WAIT, MAX_WAIT);
  // always same Id
  const clientId = Math.random() > 1 ? 'zero' : 'one';
  for (let index = 0; index < n; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    const job = { id: clientId, v: index };
    await queue.add(clientId, job).catch(console.error);
    console.log('Added job', job);
  }
}

export async function create(nIds = 10, nItems = 10): Promise<void> {
  const jobs = [];
  await randomWait(MIN_WAIT, MAX_WAIT);
  for (let index = 0; index < nIds; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    jobs.push(createJobs(nItems));
    await randomWait(MIN_WAIT, MAX_WAIT);
  }
  await randomWait(MIN_WAIT, MAX_WAIT);
  await Promise.all(jobs)
  process.exit();
}

(function mainCreate(): void {
  create().catch((err) => {
    console.error(err);
    process.exit(1);
  });
})();

// ./consume.ts
import { queue, randomWait, clientId } from './setup';

function startProcessor(minWait = 5000, maxWait = 10000): void {
  queue
    .process('*', 100, async (job) => {
      console.log('LOCKING: ', job.lockKey());
      await job.takeLock();
      const name = job.name;
      const processingId = clientId().split('-', 1)[0];
      try {
        console.log('START: ', processingId, '\tjobName:', name);
        await randomWait(minWait, maxWait);
        const data = job.data;
        console.log('PROCESSING: ', processingId, '\tjobName:', name, '\tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('PROCESSED: ', processingId, '\tjobName:', name, '\tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('FINISHED: ', processingId, '\tjobName:', name, '\tdata:', data);
      } catch (err) {
        console.error(err);
      } finally {
        await job.releaseLock();
      }
    })
    .catch(console.error); // Catches initialization
}

startProcessor();

Это запускается с использованием 3 разных процессов, которые вы можете назвать так (хотя я использую разные вкладки для более четкого представления о том, что происходит)

npx ts-node consume.ts & 
npx ts-node consume.ts &
npx ts-node create.ts &

1 Ответ

1 голос
/ 10 июля 2020

Я не знаком с node.js. Но для Redis я бы попробовал это:

Допустим, у вас есть client_1, client_2, все они публикуют события. У вас есть три машины: consumer_1, consumer_2, consumer_3.

  1. Establi sh список задач в redis, например, JOB_LIST.
  2. Clients put (LPU SH) jobs в этот JOB_LIST в специальной форме c, например «CLIENT_1: [jobcontent]», «CLIENT_2: [jobcontent]»
  3. Каждый потребитель блокирует задания (команда RPOP Redis) и обрабатывает их. Например, потребитель_1 берет задание, контент - CLIENT_1: [jobcontent]. Он анализирует контент и распознает его от CLIENT_1. Затем он хочет проверить, обрабатывает ли уже какой-либо другой потребитель CLIENT_1, в противном случае он заблокирует ключ, чтобы указать, что он обрабатывает CLIENT_1.

Далее он устанавливает ключ «CLIENT_1_PROCESSING», с содержимым как "consumer_1", с помощью команды Redis SETNX (устанавливается, если ключ не существует) с соответствующим таймаутом. Например, задача обычно занимает одну минуту до завершения sh, вы устанавливаете тайм-аут ключа в пять минут, на случай, если потребитель_1 выйдет из строя и будет удерживать блокировку бесконечно.

Если SETNX вернет 0, это означает, что ему не удалось получить блокировку CLIENT_1 (кто-то уже обрабатывает задание client_1). Затем он возвращает задание (значение «CLIENT_1: [jobcontent]») в левую часть JOB_LIST с помощью команды Redis LPU SH. Затем он может немного подождать (несколько секунд засыпать) и выполнить другую задачу RPOP с правой стороны СПИСКА. Если на этот раз SETNX возвращает 1, потребитель_1 получает блокировку. Он переходит к обработке задания, после завершения удаляет ключ «CLIENT_1_PROCESSING», снимая блокировку. Затем выполняется RPOP для другого задания и т.д.

Блокирующая часть немного рудиментарна, но ее будет достаточно.

---------- обновить --------------

Я придумал другой способ упорядочить задачи.

Для каждого клиента (производителя) создайте список. Подобно "client_1_list", поместите sh заданий в левую часть списка. Сохраните все имена клиентов в списке client_names_list со значениями client_1, client_2, et c.

Для каждого потребителя (процессора) повторите итерацию «client_names_list», например, consumer_1 получите "client_1", проверьте, заблокирован ли ключ client_1 (кто-то уже обрабатывает задачу client_1), если нет, вытащите значение (задание) из client_1_list и заблокируйте client_1. Если client_1 заблокирован (вероятно, спит одну секунду) и перейти к следующему клиенту, например, «client_2», проверить ключи и т. Д.

Таким образом, каждый клиент (производитель задачи) задача обрабатывается в порядке их поступления.

...