Node.js `Stream` и чанк заказа - PullRequest
       15

Node.js `Stream` и чанк заказа

0 голосов
/ 29 октября 2018

Сохраняет ли порядок Node.js stream вплоть до событий 'data' включительно? Я полагаю, что да, и этот вопрос также, по-видимому, предполагает, по крайней мере, что pipe методы последовательны.

Я хочу, чтобы порядок чанков был сохранен, чтобы я мог вызывать функцию, которая работает с каждым чанком до окончания потока, т. Е. Во время чтения одной части потока я хочу, чтобы чан буферизованных данных обрабатывался функция в событии 'data'.

Однако, когда я запускаю следующий код, где я использовал Math.random и setTimeout, чтобы конкретно проверить, происходят ли события по порядку:

fs.createReadStream(filePath, { encoding: 'ascii' })
  .pipe(streamToEntry)
  .on('data', (chunk) => {
    setTimeout(() => { 
      console.log(chunk); 
    }, Math.random() * 1000)
  });

порции данных могут быть записаны не по порядку.

Это из-за setTimeout() или из-за того, что событие 'data' не обязательно вызывается последовательно? то есть, должна ли упорядоченная обработка происходить только в методах pipe, или я могу обработать данные в конце последовательно?

1 Ответ

0 голосов
/ 29 октября 2018

Ваши data события гарантированно будут отправлены по порядку. См. этот ответ для некоторых дополнительных деталей и некоторых фрагментов из исходного кода узла, который показывает, что действительно вы получите ваши data события в порядке.

Проблема возникает, когда вы добавляете асинхронный код в ваш data обратный вызов (setTimeout является примером асинхронного кода). В этой ситуации ваши обратные вызовы data не гарантированно завершат обработку в том порядке, в котором они были вызваны.

Что вам нужно сделать, это убедиться, что к моменту возврата вашего обратного вызова data вы полностью обработали свои данные. Другими словами, ваш код обратного вызова должен быть кодом синхронизации.

fs.createReadStream(filePath, { encoding: 'ascii' })
  .pipe(streamToEntry)
  .on('data', (chunk) => {
      // only synchronous code here
      console.log(chunk); 
  });

Для работы кода вопроса можно использовать async / await:

fs.createReadStream(filePath, { encoding: 'ascii' })
  .pipe(streamToEntry)
  .on('data', async (chunk) => {
      // only synchronous code here
      await setTimeout(() => console.log(chunk), Math.random() * 1000); 
  });
...