Я бы хотел немного разобрать файлы CSV, чтобы преобразовать их в JSON и извлечь из них данные. Я использую Highland в качестве библиотеки обработки потоков. Я создаю массив потоков синтаксического анализа CSV, используя
import { readdir as readdirCb, createReadStream } from 'fs';
import { promisify } from 'util';
import _ from 'highland';
import parse from 'csv-parse';
const readdir = promisify(readdirCb);
const LOGS_DIR = './logs';
const options = '-maxdepth 1';
async function main() {
const files = await readdir(LOGS_DIR)
const stream = _(files)
.map(filename => createReadStream(`${LOGS_DIR}/${filename}`))
.map(parse)
}
main();
Я пытался использовать stream
как:
const stream = _(files)
.map(filename => createReadStream(`${LOGS_DIR}/${filename}`))
.map(parse)
.each(stream => {
stream.on('parseable', () => {
let record
while (record = stream.read()) { console.log(record) }
})
})
Это не приводит к записи. Я не уверен, как поступить и получить JSON для каждой строки для каждого файла CSV.
РЕДАКТИРОВАТЬ:
Написание подобной функции работает для отдельного файла:
import parse from 'csv-parse';
import transform from 'stream-transform';
import { createReadStream } from 'fs';
export default function retrieveApplicationIds(filename) {
console.log('Parsing file', filename);
return createReadStream(filename).pipe(parser).pipe(getApplicationId).pipe(recordUniqueId);
}
Редактировать 2:
Я пытался использовать подход concat streams:
const LOGS_DIR = './logs';
function concatStreams(streamArray, streamCounter = streamArray.length) {
streamArray.reduce((mergedStream, stream) => {
// pipe each stream of the array into the merged stream
// prevent the automated 'end' event from firing
mergedStream = stream.pipe(mergedStream, { end: false });
// rewrite the 'end' event handler
// Every time one of the stream ends, the counter is decremented.
// Once the counter reaches 0, the mergedstream can emit its 'end' event.
stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
return mergedStream;
}, new PassThrough());
}
async function main() {
const files = await readdir(LOGS_DIR)
const streams = files.map(parseFile);
const combinedStream = concatStreams(streams);
combinedStream.pipe(process.stdout);
}
main();
Когда я использую это, я получаю ошибку:
(узел: 1050) MaxListenersExceededWarning: Обнаружена возможная утечка памяти в EventEmitter. 11 слушателей unpipe добавлены в [Transformer]. Используйте emitter.setMaxListeners () для увеличения лимита