Сделать асинхронный вызов из линии обратного вызова readline? - PullRequest
0 голосов
/ 09 октября 2018

В настоящее время у меня есть этот рабочий код (здесь он немного упрощен);это стандартная идиома для переноса readline в обещание.

function process(fname, options){
  return new Promise(function (resolve, reject) {
    const stats = {cnt:0,error:0,mse:0.0}
    const reader = require('readline').createInterface({
        input: fs.createReadStream(fname),
        })
    reader.on('error', reject) // Reject the promise, when an error
    reader.on('line', function (row) {
        stats.error += doSync(row)
        stats.cnt++
        })
    reader.on('close', function () {
      if (stats.cnt >= 1)stats.mse = stats.error / stats.cnt
      resolve(stats)
    })
  })
}

Я хочу изменить вызов на doSync() на doAsync() (который будет возвращать обещание).

Я попытался изменить это:

reader.on('line', function (row) {
    stats.error += doSync(row)
    stats.cnt++
    })

на:

reader.on('line', async function (row) {
    stats.error += await doAsync(row)
    stats.cnt++
    })

Но это не сработало.В частности, он обрабатывает doAsync() до фактического асинхронного вызова (вызова команды оболочки), а затем сразу переходит к следующей строке.Для всех строк в файле.Тогда сценарий сидит там, и мы смотрим друг на друга.

Я догадываюсь, что readline игнорирует возвращенное обещание, и что я ничего не могу сделать.Но я надеюсь, что у улья есть некоторые идеи.

Я нахожусь на узле 8.12.0, но об обновлении до 10.x не может быть и речи.И я не привязан к использованию readline.(Но я привязан к обработке входного файла построчно!)

ОБНОВЛЕНИЕ:

ПРИМЕЧАНИЕ: у моего doAsync() оказалась ошибка.Но даже после исправления readline все равно не работал.

Переключение с readline на построчное исправило это.(Это почти полная замена; но измените 'close' даже на событие 'end'.) Принятый ответ был больше кода, но также работал одинаково хорошо.

В сравнительном тесте,Transform подход занимал 1м 48 с 1м49 с, поэтапный подход занимал 1м 49 с 1м 51 с.(ПРИМЕЧАНИЕ: только два прогона в каждом, но этого было достаточно, чтобы убедить меня, что они в основном идентичны.)

Использование reader.pause() / resume() не помогло с readline, и не требовалось для построчнойстрока (при его использовании выполнялась строгая обработка за один раз, но без этого она работала нормально).

1 Ответ

0 голосов
/ 09 октября 2018

Использование Transform потоков, вероятно, является лучшим способом асинхронной обработки каждой строки.

const {Transform} = require('stream');

Вы можете заменить библиотеку readline простым преобразованием:

function toLines() {
    let line = '';
    return new Transform({
        decodeStrings: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            const lines = chunk.split('\n');

            line += lines.shift();
            while (lines.length) {
                this.push(line);
                line = lines.shift();
            }

            callback();
        },
        flush(callback) {
            if (line) {
                this.push(line);
            }
            callback();
        }
    });
}

И реализовать еще один Transform, который собирает "статистику":

function toStats() {
    const stats = {cnt: 0, error: 0, mse: 0.0};
    return new Transform({
        objectMode: true,
        async transform(line, encoding, callback) {
            stats.error += await doAsync(line);
            stats.cnt++;
            callback();
        },
        flush(callback) {
            if (stats.cnt >= 1)stats.mse = stats.error / stats.cnt;
            callback(null, stats);
        }
    });
}

Затем вы можете реализовать process, чтобы использовать преобразования:

async function process(fname, options) {
    return new Promise((resolve, reject) => {
        fs.createReadStream(fname, {encoding: 'utf8'})
            .pipe(toLines())
            .pipe(toStats())
            .on('error', reject)
            .on('data', resolve);
    });
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...