Длинный объектный поток с завершением асинхронного преобразования слишком скоро - PullRequest
0 голосов
/ 19 декабря 2018

Я передаю ответ от запроса узла в поток преобразования, используя through2Concurrent.Этот ответ поступает в виде буфера и анализируется для объекта с помощью JSONStream.Это затем попадает в мой поток преобразования.Затем функция преобразования потока выполняет HTTP-запросы, форматирует ответ и сохраняет его в MongoDB.Мы используем параллельные потоки, потому что для обработки всего остального потребуется недопустимо много времени.

response Stream -> JSONStream.parse() -> Transform Stream

Описание проблемы
Исходный поток ответов содержит примерно18000 объектов разобрано.Однако поток завершается, и событие finish принимается до обработки всех 18 000 объектов.Не выдается никакой ошибки, но только около 2000 - 5000 объектов фактически обрабатываются до окончания потока.Точное число обрабатывается варьируется.

Вот соответствующий код:

const analyticsTransformer = through2Concurrent.obj({
  maxConcurrency: 15
}, async (doc, enc, cb) => {
  // Make an http request. This is a relatively long request.
  const res = await apim.getAnalytics(doc);
  // Save response to mongo.
  await UsageData.save(res);
  cb();
});

// Kick off the streaming.
broker.getInstances()
  .pipe(JSONStream.parse('*')
  .pipe(analyticsTransformer)
  .on('finish', () => {
    // We reach this way too quickly before we have handled all 18,000 objects
  })
  .on('error', err => {
    // No errors are caught.
  })

Что я пробовал

  • Ожидание 'end 'событие: тот же результат.Необработанные объекты и досрочное завершение.
  • Использование through2 (не through2Concurrent): получить ETIMEOUT после прохождения нескольких тысяч объектов.
  • Установка highWaterMark на 18 000: Этоединственное, что сработало.Я могу обработать все объекты, если я изменю это значение highWatermark, но это на самом деле просто бандит по проблеме.Я хочу знать, почему это работает и что я могу сделать, чтобы надежно решить свои проблемы с потоковой передачей.

Установка highWaterMark выглядит следующим образом:

const analyticsTransformer = through2Concurrent.obj({
  highWaterMark: 18,000,
  maxConcurrency: 15
}, async (doc, enc, cb) => {
  // ...
});

Почему работает изменение значения highWaterMark?

Какова реальная причина моего рано прерванного потока?

Как я могу это исправить?

Заранее спасибо всем, кто может помочь!:)

...