Ошибка тайм-аута публикации / публикации Google в Node.js - PullRequest
0 голосов
/ 07 февраля 2020

Я использую Goole Pub / Sub для обработки сообщений в моем приложении (csv parser и сообщение содержит данные файла). Он повторяется каждые 30 секунд, чтобы проверить наличие новых сообщений. После нескольких минут запуска где-то в 600-м * oop я начинаю получать эту ошибку -

Ошибка: превышено общее время ожидания повторной попытки до получения ответа при повторении (/ app / node_modules / @ google-cloud / pubsub / node_modules / google-gax / lib / api_callable. js: 223: 11) в Timeout._onTimeout (/app/node_modules/@google-cloud/pubsub/node_modules/google-gax/lib/api_callable.js : 265: 13) в ontimeout (таймеры. js: 436: 11) в tryOnTimeout (таймеры. js: 300: 5) в listOnTimeout (таймеры. js: 263: 5) в Timer.processTimers (таймеры . js: 223: 10)

Ниже моя посылка. json

 "dependencies": {
    "axios": "^0.19.2",
    "googleapis": "^45.0.0",
    "csv-parse": "^4.8.5",
    "moment": "^2.24.0",
    "dotenv": "^8.2.0",
    "knex": "^0.20.8",
    "path": "^0.12.7",
    "pg": "^7.11.0",
    "uuid": "^3.2.1",
    "google-gax": "1.6.2",
    "winston": "^3.0.0",
    "@google-cloud/pubsub": "~0.19.0"
  }

Я пытался обновиться до последней версии gax, но безрезультатно.

Вот моя программа прослушивания, скопированная и отредактированная из собственных примеров Google.

const maxMessageFileLimit = process.env.MAX_PARALLEL_PULLS * process.env.MAX_CONCURRENT_MESSAGES;
let currentMessageFileCount = 0;

function startListener(
  projectId = process.env.PROJECT_ID,
  subscriptionName = process.env.SUBSCRIPTION_NAME
) {
  const {
    v1
  } = require('@google-cloud/pubsub');
  const subClient = new v1.SubscriberClient(); // Creates a client; cache this for further use.

  async function synchronousPull() {
    const formattedSubscription = subClient.subscriptionPath(
      projectId,
      subscriptionName
    );

    const request = {
      subscription: formattedSubscription,
      maxMessages: process.env.MAX_CONCURRENT_MESSAGES || 1, // The maximum number of messages returned for this request.
    };
    const [response] = await subClient.pull(request);

    if (response && response.receivedMessages.length) {
     //some work happens here in await
     //along with acknowledging the messages ASAP

     }

}

  setInterval(() => { //loop to restart pulling messages once the worker is not busy
    if ((maxMessageFileLimit - currentMessageFileCount) >= process.env.MAX_CONCURRENT_MESSAGES) {
      synchronousPull().catch(console.error); //run next pull request only when worker is free enough
    }
  }, process.env.PULL_INTERVAL || 30000);
}

startListener();

Код работает нормально, я постоянно беспокоюсь о том, что мои сообщения и ошибки постоянно отображаются в GCP. около. Любая помощь приветствуется!

...