Что произойдет на уровне буфера, когда входной поток передается в несколько выходных потоков? - PullRequest
3 голосов
/ 12 марта 2019

Я читаю потоковый документ и ищу описание поведения буферизации для потоков в https://nodejs.org/api/stream.html#stream_buffering

В документе, похоже, не упоминается о том, что произойдет с буфером inputStream (или буферами?) При передаче по нескольким выходампоскольку разные выходные данные имеют разные скорости потребления:

Сохраняет ли readableStream выделенный буфер для каждого выхода при передаче по нескольким выходам?

Сохраняют ли выходы одинаковую скорость при использовании, или более быстрое завершение будет выполнено раньше?

const input = fs.createReadStream('img.jpg');
const target1 = input.pipe(fs.createWriteStream('target1.jpg'));
const target2 = input.pipe(fs.createWriteStream('target2.jpg'));

1 Ответ

4 голосов
/ 21 марта 2019

TL; DR: Краткий ответ: медленный целевой поток контролирует скорость потока.

Итак, давайте сначала посмотрим, что происходит на стороне чтения.

const input = fs.createReadStream('img.jpg');

Когда вы создаете экземпляр входного потока, он создается в режиме паузы и назначается для чтения (синхронное чтение не выполняется, поэтому он пока не получит доступ к файлу). Для потока highWaterMark установлено что-то вроде 16384, и в настоящее время буфер имеет размер 0 байт.

const target1 = input.pipe(fs.createWriteStream('target1.jpg'));
const target2 = input.pipe(fs.createWriteStream('target2.jpg'));

Теперь, когда вы на самом деле передаете его в записываемые потоки, режим потока устанавливается путем добавления обработчика события on('data') в реализации метода pipe - см. Источник .

Когда это будет сделано, я предполагаю, что больше нет программы для запуска, поэтому узел начинает фактическое чтение и запускает запланированный код в обработчике выше, который просто записывает любые поступающие данные.

Управление потоком данных происходит, когда какая-либо из целей имеет больше данных для записи, чем ее highWaterMark, которая запускает операцию write для возврата false. Затем чтение останавливается вызовом паузы здесь в коде . Две строки выше этого вы увидите, что state.awaitDrain увеличивается.

Теперь поток чтения снова равен paused, и записываемые потоки записывают байты на диск - в какой-то момент уровень буфера снова становится ниже highWaterMark. В этот момент наступает событие drain, которое выполняет эту строку и после вызова всех ожидаемых сливов возобновляет поток. Это делается путем проверки, достигло ли уменьшенное свойство awaitDrain нуля, что означает, что были вызваны все ожидаемые события стока.

В вышеприведенном случае более быстрый из двух потоков может вернуть ложное значение во время записи, но он определенно истощится как первый. Если бы не было awaitDrain, более быстрый поток возобновил бы поток данных, и это вызвало бы возможное переполнение буфера в более медленном из двух.

...