Использование csv-parse с highlandjs - PullRequest
1 голос
/ 30 января 2020

Я бы хотел немного разобрать файлы CSV, чтобы преобразовать их в JSON и извлечь из них данные. Я использую Highland в качестве библиотеки обработки потоков. Я создаю массив потоков синтаксического анализа CSV, используя

import { readdir as readdirCb, createReadStream } from 'fs';
import { promisify } from 'util';
import _ from 'highland';
import parse from 'csv-parse';

const readdir = promisify(readdirCb);

const LOGS_DIR = './logs';
const options = '-maxdepth 1'; 

async function main() {
  const files = await readdir(LOGS_DIR)
   const stream = _(files)
        .map(filename => createReadStream(`${LOGS_DIR}/${filename}`))
            .map(parse)

}

main();

Я пытался использовать stream как:

 const stream = _(files)
        .map(filename => createReadStream(`${LOGS_DIR}/${filename}`))
        .map(parse)
        .each(stream => {
             stream.on('parseable', () => {
                 let record
                 while (record = stream.read()) { console.log(record) }
             })
        })

Это не приводит к записи. Я не уверен, как поступить и получить JSON для каждой строки для каждого файла CSV.

РЕДАКТИРОВАТЬ:

Написание подобной функции работает для отдельного файла:

import parse from 'csv-parse';
import transform from 'stream-transform';
import { createReadStream } from 'fs';


export default function retrieveApplicationIds(filename) {
  console.log('Parsing file', filename);
  return createReadStream(filename).pipe(parser).pipe(getApplicationId).pipe(recordUniqueId);
}

Редактировать 2:

Я пытался использовать подход concat streams:

const LOGS_DIR = './logs';

function concatStreams(streamArray, streamCounter = streamArray.length) { 
  streamArray.reduce((mergedStream, stream) => {
    // pipe each stream of the array into the merged stream
   // prevent the automated 'end' event from firing
    mergedStream = stream.pipe(mergedStream, { end: false });
    // rewrite the 'end' event handler
    // Every time one of the stream ends, the counter is decremented.
    // Once the counter reaches 0, the mergedstream can emit its 'end' event.
    stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
    return mergedStream;
  }, new PassThrough());
}

async function main() {
  const files = await readdir(LOGS_DIR)
  const streams = files.map(parseFile);
  const combinedStream = concatStreams(streams);
  combinedStream.pipe(process.stdout); 
}

main();

Когда я использую это, я получаю ошибку:

(узел: 1050) MaxListenersExceededWarning: Обнаружена возможная утечка памяти в EventEmitter. 11 слушателей unpipe добавлены в [Transformer]. Используйте emitter.setMaxListeners () для увеличения лимита

...