Как pubsub узнает, сколько сообщений я опубликовал в определенный момент времени? - PullRequest
0 голосов
/ 05 мая 2020

Код для публикации сообщений здесь:

async function publishMessage(topicName) {
  console.log(`[${new Date().toISOString()}] publishing messages`);
  const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
  const topic = pubsub.topic(topicName, {
    batching: {
      maxMessages: 10,
      maxMilliseconds: 10 * 1000,
    },
  });

  const n = 5;
  const dataBufs: Buffer[] = [];
  for (let i = 0; i < n; i++) {
    const data = `message payload ${i}`;
    const dataBuffer = Buffer.from(data);
    dataBufs.push(dataBuffer);
  }

  const results = await Promise.all(
    dataBufs.map((dataBuf, idx) =>
      topic.publish(dataBuf).then((messageId) => {
        console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
        return messageId;
      })
    )
  );
  console.log('results:', results.toString());
}

Как видите, я собираюсь опубликовать sh 5 сообщений. Время публикации sh равно await Promise.all(...), я имею в виду, для пользователей, мы можем сказать, что отправлять сообщения в этот момент, но для внутренней библиотеки pubsub, возможно, нет. Я установил maxMessages на 10, поэтому pubsub будет ждать 10 секунд (maxMilliseconds), а затем опубликовать sh эти 5 сообщений.

Результат exuection соответствует моим ожиданиям:

[2020-05-05T09:53:32.078Z] publishing messages
[2020-05-05T09:53:42.209Z] Message 36854 published. index: 0
[2020-05-05T09:53:42.209Z] Message 36855 published. index: 1
[2020-05-05T09:53:42.209Z] Message 36856 published. index: 2
[2020-05-05T09:53:42.209Z] Message 36857 published. index: 3
[2020-05-05T09:53:42.209Z] Message 36858 published. index: 4
results: 36854,36855,36856,36857,36858

На самом деле, я думаю, topic.publish не вызывает напрямую удаленную службу pubsub, а помещает сообщение в очередь памяти. И есть время окна для подсчета количества сообщений, возможно, в тике или что-то вроде:

// internal logic of @google/pubsub library
setTimeout(() => {
  // if user messages to be published gte maxMessages, then, publish them immediately
  if(getLength(messageQueue) >= maxMessages) {
    callRemotePubsubService(messageQueue)
  }
}, /* window time = */ 100);

Или используя setImmediate(), process.nextTick()?

1 Ответ

0 голосов
/ 05 мая 2020

Обратите внимание, что условия отправки сообщения в службу - это ИЛИ, а не И. Другими словами, если либо maxMessages сообщения ожидают отправки, либо maxMilliseconds прошло после того, как библиотека получила первое ожидающее сообщение, она отправит невыполненные сообщения на сервер.

The Исходный код клиентской библиотеки доступен, так что вы можете точно увидеть, что она делает. В библиотеке есть очередь, которую она использует для отслеживания сообщений, которые еще не были отправлены. При добавлении сообщения, если очередь теперь заполнена (в соответствии с настройками пакетной обработки), оно немедленно вызывает publi sh. Когда добавляется первое сообщение, оно использует setTimeout для планирования вызова, который в конечном итоге вызывает publi sh в службе . Клиент издателя имеет экземпляр очереди , в которую он добавляет сообщения при вызове publish.

...