Асинхронный подписчик в Google PubSub с библиотекой Asyn c - PullRequest
2 голосов
/ 14 апреля 2020

Как известно, библиотека Google PubSub javascript является асинхронной (издатель и подписчик являются асинхронными). Из того, что я прочитал, мы не можем выполнить выполнение асинхронного подписчика c в JS коде, потому что JS является однопоточным.

  • Может кто-нибудь объяснить мне, как это действительно работает, когда я подписываюсь на событие 'message'?

  • Если я получу 5 сообщений подряд, будет ли блокироваться мой MessageHandler, пока это не будет сделано?

  • Как я могу асинхронно обрабатывать сообщения в JS? Я слышал об использовании очереди (с одним рабочим параллельно) с библиотекой asyn c. Как это работает тогда, по сравнению без этого?

Заранее спасибо за ответ!

1 Ответ

3 голосов
/ 14 апреля 2020

Выполнение асинхронно, но однопоточная природа Javascript означает, что одновременно выполняется только один из асинхронных обратных вызовов c. Когда вы подписываетесь на событие «сообщение», каждое входящее сообщение будет запускать событие для запуска вашего обратного вызова с входящим сообщением. Они будут поставлены в очередь и запускаются по одному. Каждый вызов будет блокироваться до его завершения.

Лучший способ добиться параллельной обработки сообщений с помощью Javascript - запустить несколько экземпляров вашего подписчика. Когда есть несколько подписчиков, получающих сообщения от одной и той же подписки, нагрузка Cloud Pub / Sub балансирует сообщения, отправляя подмножество сообщений каждому подписчику.

Если вы хотите запустить только один экземпляр, тогда у вас есть немного, хотя и неоптимальный выбор. Во-первых, если вы хотите разделить работу на несколько итераций по событию l oop, то вы можете использовать setImmediate, чтобы сообщить механизму, чтобы он выполнял предоставленный обратный вызов в следующей итерации события l oop. Например:

const doExpensiveWork = message => {
  // Do some more expensive processing here.
  message.ack();
}

const messageHandler = message => {
  console.log(`Received message: ${message.id}`);
  // Do some work on message here.
  setImmediate(() => doExpensiveWork(message));
};

subscription.on('message', messageHandler);

Это позволило бы вам добиться некоторого прогресса одновременно с сообщениями, хотя блоки выполнения все равно будут выполняться последовательно.

Если вы хотите параллельную обработку на разных ядрах процессора с одним запущенным экземпляром службы вам нужно fork подпроцессы .

...