Обрабатывать асинхронные сообщения в рабочих потоках - PullRequest
0 голосов
/ 03 апреля 2019

Мы используем экспериментальные рабочие NodeJS для выполнения некоторых задач, интенсивно использующих процессор. Эти задачи запускаются через сообщения, передаваемые parentPort. Во время работы потоков им необходимо сохранять данные в базе данных, которая является асинхронной операцией, подкрепленной обещаниями.

Мы видим, что parentPort сообщения продолжают отправляться в функцию-обработчик, пока мы выполняем асинхронные операции.

Пример кода, который мы делаем:

const { parentPort, Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);

  const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
  for (const x of i) {
    worker.postMessage({ idx: x });
  }
} else {
  parentPort.on('message', async (value) => {
    await testAsync(value);
  });
}

async function testAsync(value) {
  return new Promise((resolve) => {
    console.log(`Starting wait for ${value.idx}`);
    setTimeout(() => {
      console.log(`Complete resolve for ${value.idx}`);
      resolve();

      if(value.idx == 9) {
        setTimeout(() => process.exit(0), 2000);
      }
    }, 500);
  });
}

В приведенном выше примере мы видим печать Starting wait for ... до того, как появятся сообщения Complete resolve .... С async-await мы ожидали, что обработчик событий будет ждать разрешенного обещания перед обработкой нового события. В реальном примере может произойти сбой соединения с БД, что вызывает исключение, поэтому мы хотим убедиться, что текущее сообщение было полностью обработано, прежде чем принять новое.

Мы что-то здесь не так делаем?

Если нет, есть ли в любом случае достижение желаемой цели обработки события в порядке?

1 Ответ

1 голос
/ 03 апреля 2019

Кажется, что вы хотите ставить сообщения в очередь и обрабатывать только одну вещь за один раз.

parentPort.on('message', () => {} является прослушивателем событий, когда событие инициируется, оно не будет ждать до предыдущегоАсинхронная операция внутри обратного вызова выполнена.

Таким образом, если вы запускаете 'message' тысячу раз, testAsync будет выполняться тысячу раз без ожидания.

Вам необходимо реализоватьочередь в рабочем и ограничить параллелизм.В NPM есть несколько пакетов очередей обещаний.

Я буду использовать p-queue в этом примере.

const PQueue = require('p-queue');

const { parentPort, Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);

  const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
  for (const x of i) {
    worker.postMessage({ idx: x });
  }
} else {

    const queue = new PQueue({ concurrency: 1 }); // set concurrency

    parentPort.on('message', value => {
      queue.add(() => testAsync(value));
    });
}

async function testAsync(value) {
  return new Promise((resolve) => {
    console.log(`Starting wait for ${value.idx}`);
    setTimeout(() => {
      console.log(`Complete resolve for ${value.idx}`);
      resolve();

      if(value.idx == 9) {
        setTimeout(() => process.exit(0), 2000);
      }
    }, 500);
  });
}

Теперь вывод будет:

starting wait for 1
complete resolve for 1

starting wait for 2
complete resolve for 2

starting wait for N
complete resolve for N
...