Я передаю ответ от запроса узла в поток преобразования, используя through2Concurrent
.Этот ответ поступает в виде буфера и анализируется для объекта с помощью JSONStream
.Это затем попадает в мой поток преобразования.Затем функция преобразования потока выполняет HTTP-запросы, форматирует ответ и сохраняет его в MongoDB.Мы используем параллельные потоки, потому что для обработки всего остального потребуется недопустимо много времени.
response Stream -> JSONStream.parse() -> Transform Stream
Описание проблемы
Исходный поток ответов содержит примерно18000 объектов разобрано.Однако поток завершается, и событие finish
принимается до обработки всех 18 000 объектов.Не выдается никакой ошибки, но только около 2000 - 5000 объектов фактически обрабатываются до окончания потока.Точное число обрабатывается варьируется.
Вот соответствующий код:
const analyticsTransformer = through2Concurrent.obj({
maxConcurrency: 15
}, async (doc, enc, cb) => {
// Make an http request. This is a relatively long request.
const res = await apim.getAnalytics(doc);
// Save response to mongo.
await UsageData.save(res);
cb();
});
// Kick off the streaming.
broker.getInstances()
.pipe(JSONStream.parse('*')
.pipe(analyticsTransformer)
.on('finish', () => {
// We reach this way too quickly before we have handled all 18,000 objects
})
.on('error', err => {
// No errors are caught.
})
Что я пробовал
- Ожидание 'end 'событие: тот же результат.Необработанные объекты и досрочное завершение.
- Использование
through2
(не through2Concurrent
): получить ETIMEOUT после прохождения нескольких тысяч объектов. - Установка
highWaterMark
на 18 000: Этоединственное, что сработало.Я могу обработать все объекты, если я изменю это значение highWatermark
, но это на самом деле просто бандит по проблеме.Я хочу знать, почему это работает и что я могу сделать, чтобы надежно решить свои проблемы с потоковой передачей.
Установка highWaterMark
выглядит следующим образом:
const analyticsTransformer = through2Concurrent.obj({
highWaterMark: 18,000,
maxConcurrency: 15
}, async (doc, enc, cb) => {
// ...
});
Почему работает изменение значения highWaterMark
?
Какова реальная причина моего рано прерванного потока?
Как я могу это исправить?
Заранее спасибо всем, кто может помочь!:)