Почему этот итератор asyn c readline не работает должным образом? - PullRequest
2 голосов
/ 14 июля 2020

Это часть более крупного процесса, который я сократил до минимального воспроизводимого примера в узле v14.4.0. В этом коде он ничего не выводит изнутри for l oop.

Я вижу только этот вывод в консоли:

before for() loop
finished
finally
done

The for await (const line1 of rl1) l oop никогда не попадает в for l oop - он просто пропускает его:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

Но, если я удалю любое из операторов await once(stream, 'open'), тогда for l oop делает именно то, что от него ожидается (перечисляет все строки файла rl1). Итак, по-видимому, есть некоторая проблема синхронизации с итератором asyn c из интерфейса readline между этим и потоком. Любые идеи, что может происходить. Есть идеи, что может быть причиной этого или как его обойти?

FYI, await once(stream, 'open') существует из-за другой ошибки в итераторе asyn c, где он не отклоняется, если возникает проблема файл, в то время как await once(stream, 'open') заставляет вас правильно получить отказ, если файл не может быть открыт (по существу, предварительное открытие).

Если вам интересно, почему там код stream2, он используется в более крупном проекте, но я сократил этот пример до минимального воспроизводимого примера, и для демонстрации проблемы требуется только эта часть кода.

Изменить: Пробуя немного другую реализацию, я обнаружил, что если я объединю два вызова once(stream, "open") в Promise.all(), то это сработает. Итак, это работает:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const stream2 = fs.createReadStream(file2);
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
        // pre-flight file open to catch any open errors here
        // because of existing bug in async iterator with file open errors
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

Очевидно, это не должно зависеть от того, как именно вы ждете открытия файла. Где-то есть ошибка времени. Я хотел бы найти эту ошибку либо в строке чтения, либо в потоке чтения и зарегистрировать ее. Есть идеи?

Ответы [ 3 ]

2 голосов
/ 14 июля 2020

Оказывается, основная проблема заключается в том, что readline.createInterface() сразу после вызова добавит прослушиватель событий data (здесь ссылка на код ) и возобновит поток, чтобы запустить поток.

input.on('data', ondata);

и

input.resume();

Затем в слушателе ondata он анализирует данные для строк и, когда он находит строку, запускает здесь line событий .

for (let n = 0; n < lines.length; n++)
  this._onLine(lines[n]);

Но в моих примерах между моментом вызова readline.createInterface() и созданием итератора asyn c (который будет прослушивать line событий). Итак, line события были сгенерированы, и их еще ничего не слушало.

Итак, для правильной работы readline.createInterface() ТРЕБУЕТ, чтобы все, что будет слушать line события, ДОЛЖНО быть добавлено синхронно после вызова readline.createInterface() или есть условие гонки и line события могут быть потеряны.

В моем исходном примере кода надежный способ обойти это - не вызывать readline.createInterface() до тех пор, пока я не выполню await once(...). Тогда асинхронный итератор будет создан синхронно сразу после вызова readline.createInterface().

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const stream2 = fs.createReadStream(file2);
        // wait for both files to be open to catch any "open" errors here
        // since readline has bugs about not properly reporting file open errors
        // this await must be done before either call to readline.createInterface()
        // to avoid race conditions that can lead to lost lines of data
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

Один из способов исправить эту общую проблему - изменить readline.createInterface() так, чтобы он не добавлял событие data, и возобновлял поток ДО тех пор, пока кто-нибудь не добавит прослушиватель событий line. Это предотвратит потерю данных. Это позволило бы объекту интерфейса readline спокойно сидеть без потери данных, пока получатель его вывода не будет действительно готов. Это будет работать для итератора asyn c, а также предотвратит возможное использование интерфейса с другим асинхронным кодом от возможной потери line событий.

Примечание об этом добавлено в соответствующую открытую строку чтения ошибка здесь .

1 голос
/ 14 июля 2020

Модуль readline можно также заменить простым потоком Transform с использованием более современного API потоков. Современный потоковый API из коробки поддерживает асинхронные c итераторы, а также противодавление (например, сторона записи потока (чтение файла) будет приостанавливаться до тех пор, пока не будет использована сторона чтения потока (чтение строки)).

const fs = require('fs');
const { Transform } = require('stream');

function toLines() {
    let remaining = '';
    return new Transform({
        writableObjectMode: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            try {
                const lines = (remaining + chunk).split(/\r?\n/g);
                remaining = lines.pop();
                for (const line of lines) {
                    this.push(line);
                }
                callback();
            } catch (err) {
                callback(err);
            }
        },
        flush(callback) {
            if (remaining !== '') {
                this.push(remaining);
            }
            callback();
        }
    });
}


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
        const rl1 = stream1.pipe(toLines());

        const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
        const rl2 = stream2.pipe(toLines());

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

Этот пример не поддерживает параметр crlfDelay модуля readline, но алгоритм может быть изменен, чтобы сделать что-то подобное. Он также (насколько я могу судить) лучше обрабатывает ошибки, чем это поддерживается модулем readline.

1 голос
/ 14 июля 2020

Вы можете выполнить эту работу должным образом, если создадите итератор asyn c сразу после построения интерфейса readline. Если вы дождетесь создания итератора asyn c, вы можете потерять некоторые строки, поскольку события строки не буферизуются интерфейсом readline, но благодаря итератору asyn c они будут буферизированы.

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const rl1Iterator = rl1[Symbol.asyncIterator]();

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1Iterator) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("stream.txt", "stream.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

Судя по обсуждениям в комментариях, это все еще может быть нежелательным решением, поскольку у модуля readline есть другие проблемы, но я решил, что добавлю ответ для решения проблемы, как указано в исходном вопросе.

...