Как управлять потоком для обработки входящих данных в node.js - PullRequest
0 голосов
/ 08 ноября 2019

Для каждой строки в файле я хочу выполнить вычислительно сложную задачу, например сжатие изображений. У меня проблема в том, что данные поступают слишком быстро и переполняют память. В идеале я хотел бы иметь возможность приостановить и продолжить поток по мере обработки данных.

Сначала я попытался использовать модуль readline с таким потоком файлов:

const fileStream = fs.createReadStream('long-list.txt')
const rl = readline.createInterface({ input: fileStream })
rl.on('line', (line, lineCount) => {
   doTheHeavyTask(line)
})

Однако, это быстро переполняет память тысячами вызовов на doTheHeavyTask().

Я остановился на том, чтобы поместить каждую строку в очередь и создать событие, которое удаляет следующую строку, когда предыдущая строка обрабатывается:

const lineQ = new Queue() // From the 'queue-fifo' module
rl.on('line', (line, lineCount) => {
   lineQ.enqueue(line)
})
const lineEmitter = new EventEmitter() // From the 'events' module
lineEmitter.on('processNextLine', async () => {
    await doTheHeavyTask( lineQ.dequeue() )
    if (!lineQ.isEmpty()) lineEmitter.emit('processNextLine')
})
setTimeout( () => lineEmitter.emit('processNextLine'), 20) // Give rl a moment to enqueue some lines

Это работает, но кажется несколько странным и не намного лучше, чем просто чтение в файле сразу.

Я смутно осознаю такие понятия, как "противодавление" и«генераторы» в Javascript, но я не уверен, как их применять.

1 Ответ

0 голосов
/ 08 ноября 2019

Проблема здесь не в самом потоке, а в асинхронной задаче, которую вы запускаете. Каждая задача (будь то обратный вызов с замыканием или асинхронная функция) потребляет память. Если вы запустите несколько (тысячи) задач одновременно, это будет использовать ресурсы.

Вы можете использовать асинхронный итератор для перехода по строкам и выполнения одной задачи для каждой (и ожидания этого):

 (async function () {
     for await (const el of rl) {
        await doHeavyTask(el);
     }
 })();

Это будет правильно противодавление.

Однако он выполняет только одну задачу за раз , что может быть довольно медленным. Чтобы буферизовать несколько элементов и обрабатывать их одновременно, вы можете сделать:

 const SIZE = 10; // to be tested with different values

 (async function () {
   let chunk = [];
   for await(const el of rl) {
      chunk.push(el);
      if(chunk.length >= SIZE) {
         await Promise.all(chunk.map(doHeavyTask));
         chunk.length = 0;
      }
   }
   await Promise.all(chunk.map(doHeavyTask));
})();

Вам нужно как минимум Node 11.14.0 , чтобы это работало .

...