Я использую Goole Pub / Sub для обработки сообщений в моем приложении (csv parser и сообщение содержит данные файла). Он повторяется каждые 30 секунд, чтобы проверить наличие новых сообщений. После нескольких минут запуска где-то в 600-м * oop я начинаю получать эту ошибку -
Ошибка: превышено общее время ожидания повторной попытки до получения ответа при повторении (/ app / node_modules / @ google-cloud / pubsub / node_modules / google-gax / lib / api_callable. js: 223: 11) в Timeout._onTimeout (/app/node_modules/@google-cloud/pubsub/node_modules/google-gax/lib/api_callable.js : 265: 13) в ontimeout (таймеры. js: 436: 11) в tryOnTimeout (таймеры. js: 300: 5) в listOnTimeout (таймеры. js: 263: 5) в Timer.processTimers (таймеры . js: 223: 10)
Ниже моя посылка. json
"dependencies": {
"axios": "^0.19.2",
"googleapis": "^45.0.0",
"csv-parse": "^4.8.5",
"moment": "^2.24.0",
"dotenv": "^8.2.0",
"knex": "^0.20.8",
"path": "^0.12.7",
"pg": "^7.11.0",
"uuid": "^3.2.1",
"google-gax": "1.6.2",
"winston": "^3.0.0",
"@google-cloud/pubsub": "~0.19.0"
}
Я пытался обновиться до последней версии gax, но безрезультатно.
Вот моя программа прослушивания, скопированная и отредактированная из собственных примеров Google.
const maxMessageFileLimit = process.env.MAX_PARALLEL_PULLS * process.env.MAX_CONCURRENT_MESSAGES;
let currentMessageFileCount = 0;
function startListener(
projectId = process.env.PROJECT_ID,
subscriptionName = process.env.SUBSCRIPTION_NAME
) {
const {
v1
} = require('@google-cloud/pubsub');
const subClient = new v1.SubscriberClient(); // Creates a client; cache this for further use.
async function synchronousPull() {
const formattedSubscription = subClient.subscriptionPath(
projectId,
subscriptionName
);
const request = {
subscription: formattedSubscription,
maxMessages: process.env.MAX_CONCURRENT_MESSAGES || 1, // The maximum number of messages returned for this request.
};
const [response] = await subClient.pull(request);
if (response && response.receivedMessages.length) {
//some work happens here in await
//along with acknowledging the messages ASAP
}
}
setInterval(() => { //loop to restart pulling messages once the worker is not busy
if ((maxMessageFileLimit - currentMessageFileCount) >= process.env.MAX_CONCURRENT_MESSAGES) {
synchronousPull().catch(console.error); //run next pull request only when worker is free enough
}
}, process.env.PULL_INTERVAL || 30000);
}
startListener();
Код работает нормально, я постоянно беспокоюсь о том, что мои сообщения и ошибки постоянно отображаются в GCP. около. Любая помощь приветствуется!