Узел: трубопровод не блокируется при остановленном проходе - PullRequest
0 голосов
/ 14 января 2019

Одним из базовых поведений потока узла является блокировка при записи в приостановленный поток, а любой поток без конвейера блокируется.

В этом примере созданный PassThrough ни к чему не относится в цикле событий создания. Можно ожидать, что любой конвейер на этом PassThrough будет блокироваться, пока он не будет передан по каналу / не присоединено событие данных, но это не так.

Обратные вызовы pipeline, но ничего не используется.

const {promises: pFs} = require('fs');
const fs = require('fs');
const {PassThrough} = require('stream');
const {pipeline: pipelineCb} = require('stream');
const util = require('util');
const pipeline = util.promisify(pipelineCb);
const path = require('path');
const assert = require('assert');

/**
 * Start a test ftp server
 * @param {string} outputPath
 * @return {Promise<void>}
 */
function myCreateWritableStream (outputPath) {
    // The stream is created in paused mode -> should block until piped
    const stream = new PassThrough();

    (async () => {
        // Do some stuff (create directory / check space / connect...)
        await new Promise(resolve => setTimeout(resolve, 500));

        console.log('piping passThrough to finale output');

        // Consume the stream
        await pipeline(stream, fs.createWriteStream(outputPath));

        console.log('passThrough stream content written');
    })().catch(e => {
        console.error(e);
        stream.emit('error', e);
    });

    return stream;
}

/**
 * Main test function
 * @return {Promise<void>}
 */
async function main () {
    // Prepare the test directory with a 'tmp1' file only
    const smallFilePath = path.join(__dirname, 'tmp1');
    const smallFileOut = path.join(__dirname, 'tmp2');

    await Promise.all([
        pFs.writeFile(smallFilePath, 'a small content'),
        pFs.unlink(smallFileOut).catch(e => assert(e.code === 'ENOENT'))
    ]);

    // Duplicate the tmp1 file to tmp2
    await pipeline([
        fs.createReadStream(smallFilePath),
        myCreateWritableStream(smallFileOut)
    ]);
    console.log('pipeline ended');

    // Check content
    const finalContent = await pFs.readdir(__dirname);
    console.log('directory content');
    console.log(finalContent.filter(file => file.startsWith('tmp')));
}


main().catch(e => {
    process.exitCode = 1;
    console.error(e);
});

Этот код выводит следующие строки:

pipeline ended
directory content
[ 'tmp1' ]
piping passThrough to finale output
passThrough stream content written

Если pipeline действительно ждал окончания потока, то вывод будет следующим:

piping passThrough to finale output
passThrough stream content written
pipeline ended
directory content
[ 'tmp1', 'tmp2' ]

Как вы можете объяснить это поведение?

1 Ответ

0 голосов
/ 14 января 2019

Я не думаю, что API дает гарантии, которые вы ищете здесь.

stream.pipeline вызывает свой обратный вызов после завершения записи всех данных. Поскольку данные были записаны в новый поток Transform (ваш Passthrough), и этому потоку еще некуда помещать данные, он просто сохраняется во внутреннем буфере потока. Этого достаточно для конвейера.

Если вы хотите прочитать достаточно большой файл , заполняя буфер потока преобразования, то обратное давление потока может автоматически вызвать pause() для читаемого файла, который читает файл. Как только поток Transform истощится, он автоматически unpause() станет доступным для чтения, поэтому поток данных возобновится.

Я думаю, что ваш пример делает два неверных предположения:

(1) что вы можете приостановить трансформационный поток. Согласно stream docs , приостановка любого потока, который передается в пункт назначения , неэффективна, потому что он немедленно отключит себя, как только переданный по трубопроводу пункт назначения запросит дополнительные данные. Кроме того, приостановленный поток преобразования все еще считывает данные! Приостановленный поток просто не записывает данные.

(2) что пауза дальше по конвейеру каким-то образом распространяется до передней части конвейера и вызывает прекращение потока данных. Это только правда , если вызвано противодавлением, то есть вам нужно будет запустить обнаружение узла полного внутреннего буфера.

При работе с трубами лучше предположить, что у вас есть ручное управление двумя самыми дальними концами, но не обязательно какой-либо из частей в середине. (Вы можете вручную pipe() и unpipe() подключить и отключить промежуточные потоки, но вы не можете приостановить их.)

...