NodeJS: Как читать из двух файлов и записывать в один выходной файл с помощью каналов? - PullRequest
2 голосов
/ 13 июля 2020

Контекст

Я использую модуль потока событий, чтобы помочь мне читать и писать в эти локальные файлы, для которых я надеюсь вернуть результирующий файл. Короче говоря, 2 входных файла (отправляемых через express API как multipart/form-data), как я ожидаю, могут иметь размер более 200 МБ, содержащих список записей (по 1 на строку). Я хотел бы объединить эти записи в следующем формате <entry1>:<entry2>, где entry1 - это запись из первого файла, а entry2 - из второго файла. Я сделал это ранее, когда я мог хранить и возвращать входы / выходы в памяти, но, поскольку у меня очень ограниченное пространство памяти на моем сервере приложений, мне не хватало памяти в куче. Я читал, что могу использовать поток событий и конвейер для чтения в каждом файле построчно и вывода в файл вместо большой строки в памяти с использованием потоков чтения. Проблема в том, что я не могу решить правильным образом / в нужное время, чтобы полученный выходной файл был готов к отправке обратно вызывающему.

То, что у меня есть на данный момент

То, что я до сих пор работал, так это то, что я получаю правильный вывод файла, который я ожидал, однако это, похоже, проблема асинхронности, поскольку я выполняю обещание до того, как файл фактически завершил запись / сохранение. Пожалуйста, посмотрите мой код ниже ...

const fs = require('fs');
const es = require('event-stream');
const uuid = require('uuid');

const buildFile = async (fileOne, fileTwo) =>
    await new Promise((resolve, reject) => {
        try {
            // Output stream
            let fileID = uuid.v4();
            let outStream = fs
                .createWriteStream(`files/outputFile-${fileID}.txt`, {
                    flags    : 'a',
                    encoding : 'utf-8'
                });

            let fileOneRS = fs
                .createReadStream(fileOne.path, {
                    flags    : 'r',
                    encoding : 'utf-8'
                })
                .pipe(es.split())
                .pipe(
                    es.mapSync((lineOne) => {
                        fileOneRS.pause();

                        let fileTwoRS = fs
                            .createReadStream(fileTwo.path, {
                                flags    : 'r',
                                encoding : 'utf-8'
                            })
                            .pipe(es.split())
                            .pipe(
                                es.mapSync((lineTwo) => {
                                    fileTwoRS.pause();

                                    // Write combo to file
                                    outStream.write(`${lineOne}:${lineTwo}\n`);

                                    fileTwoRS.resume();
                                })
                            );

                        fileOneRS.resume();
                    })
                ); // This is where I have tried doing .on('end', () => resolve), but it also does not work :(
        } catch (err) {
            reject(err);
        }
    });

Примечание: Эта функция вызывается из другой служебной функции следующим образом:

buildFile(fileOne, fileTwo)
    .then((result) => {
        resolve(result);
    })
    .catch((err) => {
        console.log(err);
        reject(err);
    });

Как новичок Javascript разработчик и даже новее NodeJS, я застрял, пытаясь разобраться в этом самостоятельно уже более 2 недель. Если кто-то может помочь, я был бы очень признателен за мудрость здесь!

Спасибо ?

1 Ответ

1 голос
/ 14 июля 2020

Изменить : обновлен код, чтобы он соответствовал ожидаемому результату OP.

Функция обещания 'resolve() должна вызываться после завершения потока записи. Комментарий во фрагменте OP указывает, что функция разрешения могла быть вызвана при сливе fileOneRS (в конце цепочки pipe ()).

Вместо создания нового потока чтения для каждой строки в первый файл, код должен создавать экземпляры потоков чтения только один раз. -line:

import stream from "stream";
import util from "util";
import readline from "readline";
import fs from "fs";
import os from "os";

/** Returns a readable stream as an async iterable over text lines */
function lineIteratorFromFile( fileStream ){
  return readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  })
}

// Use stream.pipeline to handle errors and to stream the combined output
// to a Writable stream. The promise will resolve once the data has finished
// writing to the output stream.
await util
  .promisify(stream.pipeline)(
    async function*(){
      for await ( const lineA of lineIteratorFromFile(fs.createReadStream( "./in1.txt" ))){
        for await (const lineB of lineIteratorFromFile(fs.createReadStream( "./in2.txt" ))){
          yield `${lineA}: ${lineB}${os.EOL}`
        }
      }
    },
    fs.createWriteStream( outputFile )
  );

Пример выполнения с NodeJS v13 + доступен в свернутом фрагменте ниже:

// in1.txt:
foo1
foo2

// in2.txt:
bar1
bar2

// out.txt (the file created by this script, with expected output):
foo1: bar1
foo1: bar2
foo2: bar1
foo2: bar2

// main.mjs:
import stream from "stream";
import util from "util";
import readline from "readline";
import fs from "fs";
import os from "os";

/** Returns a readable stream as an async iterable over text lines */
function lineIteratorFromFile( fileStream ){
  return readline.createInterface({
input: fileStream,
crlfDelay: Infinity
  })
}

(async ()=>{
  await util
.promisify(stream.pipeline)(
  async function*(){
    for await ( const lineA of lineIteratorFromFile(fs.createReadStream( "./in1.txt" ))){
      for await (const lineB of lineIteratorFromFile(fs.createReadStream( "./in2.txt" ))){
        yield `${lineA}: ${lineB}${os.EOL}`
      }
    }
  },
  fs.createWriteStream( "./out.txt" )
);
})()
  .catch(console.error);
...