Публикация более реальных сообщений в Google Pub / Sub с использованием Node.js и csv-parse - PullRequest
0 голосов
/ 19 февраля 2020

Использование Node.js, Google Pub / Sub, csv-parse.

Вариант использования - у меня есть большой CSV-файл для обработки и импорта в моей БД. Он имеет несколько сторонних API, которые обрабатывают каждую строку за 1 секунду. Таким образом, поток процесса ниже -

  1. Пользователь загружает файл
  2. сервер узла загружает файл в хранилище и отправляет сообщение в PubSubNo.1
  3. Теперь мой слушатель слушает выше pubsub и начинает обработку этих сообщений, он загружает файл и начинает разбивать каждую строку и публикует в другом PubSub для дальнейшей обработки
  4. В конце я параллельно обрабатываю эти сообщения меньших строк и добиваюсь более быстрой обработки.

Проблема - Как только мой слушатель загружает файл, он отправляет x нет. ряд сообщений на следующий PubSubNo2, но когда я проверяю его подписку, он показывает более чем x сообщений. Например, я загружаю 6000 записей в формате CSV, и на подписчике показывается более 40K-50K сообщений.

Пакет. json

"dependencies": {
    "@google-cloud/pubsub": "1.5.0",
    "axios": "^0.19.2",
    "csv-parse": "^4.8.5",
    "dotenv": "^8.2.0",
    "google-gax": "1.14.1",
    "googleapis": "47.0.0",
    "moment": "^2.24.0",
    "path": "^0.12.7",
    "pg": "^7.18.1",
    "winston": "^3.0.0"
  }

Код издателя

async processFile(filename) {
    let cnt = 0;
    let index = null;
    let rowCounter = 0;
    const handler = (resolve, reject) => {
      const parser = CsvParser({
          delimiter: ',',
        })
        .on('readable', () => {
          let row;
          let hello = 0;
          let busy = false;
          this.meta.totalRows = (parser.info.records - 1);
          while (row = parser.read()) {
            if (cnt++ === 0) {
              index = row;
              continue;
            }
            let messageObject = {
              customFieldsMap: this.customFieldsMap,
              importAttributes: this.jc.attrs,
              importColumnData: row,
              rowCount: cnt,
              importColumnList: index,
              authToken: this.token
            }
            let topicPublishResult = PubSubPublish.publishToTopic(process.env.IMPORT_CSV_ROW_PUBLISHING_TOPIC, messageObject);
            topicPublishResult.then((response) => {
              rowCounter += 1;
              const messageInfo = "Row " + rowCounter + " published" +
                " | MessageId = " + response +
                " | importId = " + this.data.importId +
                " | fileId = " + this.data.fileId +
                " | orgId = " + this.data.orgId;
              console.info(messageInfo);
            })
          }
        })
        .on('end', () => {
          console.log("File consumed!");
          resolve(this.setStatus("queued"))
        })
        .on('error', reject);
      fs.createReadStream(filename).pipe(parser);
    };
    await new Promise(handler);
  }

И Publi sh код модуля

const {
  PubSub
} = require('@google-cloud/pubsub');

const pubsub = new PubSub({
  projectId: process.env.PROJECT_ID
});
module.exports = {
  publishToTopic: function(topicName, data) {
    return pubsub.topic(topicName, {
      batching: {
        maxMessages: 500,
        maxMilliseconds: 5000,
      }
    }).publish(Buffer.from(JSON.stringify(data)));
  },
};

Это работает без каких-либо проблем для файлов OS 10, 100,200,2000 записей, но создает проблемы с большим количеством записей 6K. После публикации sh 6K записей возникает ошибка UnhandledPromiseRejection для всех записей 6K, например:

(node:49994) UnhandledPromiseRejectionWarning: Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:65:31)
    at Timeout._onTimeout (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:100:25)
    at listOnTimeout (internal/timers.js:531:17)
    at processTimers (internal/timers.js:475:7)
(node:49994) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 6000)

Любая помощь приветствуется!

1 Ответ

2 голосов
/ 19 февраля 2020

Возможно, ваш издатель перегружен, когда у вас есть 6000 сообщений для публикации sh. Причина в том, что вы создаете новый экземпляр издателя для каждого сообщения, которое вы создаете в своем методе publishToTopic. Следовательно, вы не получаете никакой возможности использовать пакетную обработку и ждете 5 секунд, чтобы отправить каждое сообщение. Это много накладных расходов для каждого сообщения. Это может означать, что обратные вызовы не обрабатываются своевременно, что приводит к тайм-аутам и попыткам повторной отправки. Вы хотите создать объект pubsub.topic один раз, а затем повторно использовать его для вызовов publi sh.

...