Обработка вложенных потоков - PullRequest
0 голосов
/ 05 апреля 2019

Я пытаюсь сгенерировать выходной файл, соединив 2 входных потока CSV, для каждой записи в CSV 1 Я хочу создать выход для каждой записи в CSV 2.

Я наткнулся на высокогорье при просмотре переполнения стека для любых подобных решений и наткнулся:

Операции с вложенным потоком в Highland.js

Я попытался приспособить это к моей собственной проблеме и до сих пор имею это:

    const debug = require('debug')('csvparse');
    const csv = require('fast-csv');
    const fs = require('fs');
    const args = process.argv;
    const h = require('highland');

    const typestream = h(fs.createReadStream(args[2]).pipe(csv({ headers: true, ignoreEmpty: true })));
    const postcodestream = h(fs.createReadStream(args[3]).pipe(csv({ headers: true, ignoreEmpty: true })));

    const pipeline = typestream.flatMap((type) => {
        debug(type);

        return postcodestream.flatMap((postcode) => {
            debug(postcode);

            return h([`${type.type}-${postcode.postcode}\n`]);
        });
    });

    pipeline.pipe(process.stdout);

Со следующими примерами ввода csv1:

type,
STREET,
ROAD,

csv2:

postcode,
3456
3446
1234

Я ожидаю, что результат будет

STREET-3456
STREET-3446
STREET-1234
ROAD-3456
ROAD-3446
ROAD-1234

Но я просто получаю:

STREET-3456
STREET-3446
STREET-1234

Я вижу по операторам отладки, что один раз выхожу из ROAD, а затем он останавливается.

1 Ответ

0 голосов
/ 05 апреля 2019

Хорошо, я понял мою проблему, в основном я должен был использовать для анализа csv вместо обертывания канала, и я также должен был создать fs.createReadStream в начальном flatMap, а не ссылаться на него из переменной (как поток завершится после начальной итерации).

Код теперь:

#!/usr/bin/node
const debug = require('debug')('csvparse');
const csv = require('fast-csv');
const fs = require('fs');
const args = process.argv;
const h = require('highland');

const pipeline = h(fs.createReadStream(args[2]))
    .through(csv({ headers: true, ignoreEmpty: true }))
    .flatMap((type) => {
        return h(fs.createReadStream(args[3]))
            .through(csv({ headers: true, ignoreEmpty: true }))
            .map((postcode) => {
                return `${type.type}-${postcode.postcode}\n`;
            });
    });

pipeline.pipe(process.stdout);
...