Gcloud Pubsub работник в JS - PullRequest
0 голосов
/ 15 апреля 2019

Я разрабатываю рабочий в JS с помощью setInterval, чтобы каждые 10 секунд проверять наличие нового сообщения. Я хотел бы знать, является ли подход правильным с помощью следующего кода. Как вы думаете, у меня будут проблемы с производительностью? Я запускаю его как одну капсулу в Кубернете.

const messageHandler = message => {
  sendRequest(message, (message, error, response, body) => {
    if (!error && response.statusCode == 200) {
      message.ack();
    }
  });
};

subscription.on(`message`, messageHandler);

const timeout = 10;
setInterval(() => {
  subscription.removeListener('message', messageHandler);
  subscription.on(`message`, messageHandler);
}, timeout * 1000);

Ответы [ 2 ]

2 голосов
/ 16 апреля 2019

Вместо того, чтобы удалять и регистрировать прослушиватель messageHandler каждые 10 секунд, следующие два лучших подхода:

  1. Иметь долго работающий прослушиватель сообщений для входящих сообщений до тех пор, пока не будет достигнут период времени «x»с момента создания прослушивателя.
  2. Имеет долго работающего прослушивателя сообщений для входящих сообщений и отключает подписчика только в том случае, если прошло время 'x' с момента получения последнего сообщения.

В редких случаях, когда сообщения задерживаются, возможно, подписчик может отключиться до получения всех опубликованных сообщений.Но если вы установите тайм-аут на большое число (порядка минут), это будет очень маловероятно.

См. клиентскую библиотеку для примеров создания клиентского клиента.Вы можете изменить время ожидания на большее число.Клиентская библиотека использует StreamingPull , который поддерживает открытый двунаправленный поток и принимает сообщения по мере их доступности для максимальной пропускной способности и низкой задержки.

0 голосов
/ 15 мая 2019

Возможно, я что-то здесь упускаю, но если вы хотите, чтобы ваш подписчик был активным и слушал «вечно», тогда я не понимаю, зачем вам вообще удалять своего слушателя.

Так почему бы и неткак то так:

const messageHandler = async message => {
    try {
        await doSomethingWith(message);
        message.ack();
    } catch (err) {
        console.error(`Error handling message: ${message.id}: ${err.message}`);
        message.nack();
    }
};


// Listen for new messages
subscription.on(`message`, messageHandler);
...