Узловые потоки - Нажатие на чтение непреднамеренно разделяет на 3 потока записи - PullRequest
1 голос
/ 06 марта 2019

Цель: Объекты будут помещены в читаемый поток и затем сохранены в отдельном .csv в зависимости от того, с какого канала (Email, Push, In-App) они поступают.

Проблема: Я не могу разделить потоки на разные «строки» .pipe (), поэтому все журналы .csv получают только свои объекты событий, относящиеся к конкретному каналу.Но в текущей итерации все CSV-файлы, созданные Writestream, получают объекты событий со всех каналов.

Вопросы: Могу ли я динамически создать многоканальные "линии (трубы)" в функции setup () программно или текущий подход подходит к этому правильному?

Является ли это ручное создание "pipe () lines" причиной того, что все CSV-файлы заполняются событиями?Может ли это быть решено с помощью одной "pipe () line" и динамической маршрутизации?

Краткое объяснение кода ниже:

setup () вызывает makeStreams () - создает объект с доступным для чтения и записи (вращающийся поток файловой системы с возможностью записи) (настройка () сейчас является ненужной функцией, но позже будет содержать больше задач настройки.)

pushStream () вызывается, когда происходит входящее событие и толкает объект, например: {Email: {queryParam: 1, queryParam: 2 и т. Д.}}} Событие сортируется по объекту самого высокого уровня.(в данном случае «Электронная почта»), а затем отправляется в правильный поток для записи, который теоретически должен быть перенесен в правильный поток для записи.К сожалению, это не тот случай, он отправляет объект события во все доступные для записи потоки.Как я могу отправить его только в правильный поток?

КОД :

const Readable = require('stream').Readable
const Json2csvTransform = require('json2csv').Transform;
var rfs = require("rotating-file-stream");

const channelTypes = ['Push Notification', 'Email', 'In-app Message']
var streamArr = setup(channelTypes);
const opts = {};
const transformOpts = {
    objectMode: true
};

const json2csv = new Json2csvTransform(opts, transformOpts);

function setup(list) {
    console.log("Setting up streams...")
    streamArr = makeStreams(list) //makes streams out of each endpoint
    return streamArr
}

//Stream Builder for Logging Based Upon Channel Name
function makeStreams(listArray) {
    listArray = ['Push Notification', 'Email', 'In-app Message']
    var length = listArray.length
    var streamObjs = {}
    for (var name = 0; name < length; name++) {
        var fileName = listArray[name] + '.csv'
        const readStream = new Readable({
            objectMode: true,
            read() {}
        })
        const writeStream = rfs(fileName, {
            size: "50M", // rotate every 50 MegaBytes written
            interval: "1d" // rotate daily
            //compress: "gzip" // compress rotated files
        });

        var objName = listArray[name]
        var obj = {
            instream: readStream,
            outstream: writeStream
        }
        streamObjs[objName] = obj
    }
    return streamObjs
}

function pushStream(obj) {
    var keys = Object.keys(obj)

        if (streamArr[keys]) {
        streamArr[keys].instream.push(obj[keys])
    } else {
        console.log("event without a matching channel error")
    }
}

//Had to make each pipe line here manually. Can this be improved? Is it the reason all of the files are receiving all events?
streamArr['Email'].instream.pipe(json2csv).pipe(streamArr['Email'].outstream)
streamArr['In-app Message'].instream.pipe(json2csv).pipe(streamArr['In-app Message'].outstream)
streamArr['Push Notification'].instream.pipe(json2csv).pipe(streamArr['Push Notification'].outstream)

module.exports = {
    makeStreams,
    pushStream,
    setup
}
...