Как создать несколько потоков из одного потока в узле? - PullRequest
0 голосов
/ 24 ноября 2018

У меня есть данные 1 м ohlc, поступающие от внешнего API WebSocket, и я пытаюсь сгенерировать из них потоки 5 м 15 м 30 м 1 ч 1 д и 1 Вт.Мой текущий TransformStream может преобразовать 1 м в один из вышеупомянутых таймфреймов

Как мне сделать

one.pipe(five)
one.pipe(fifteen)
one.pipe(thirty)

без создания нескольких экземпляров исходного потока «один».В настоящее время мой единственный экземпляр TransformStream испускает исходные данные плюс все необходимые временные рамки.

class TimeFrameGenerator extends Transform {
    constructor(timeframes) {
        super({ readableObjectMode: true, writableObjectMode: true });
        this.kline = {}

        //Consider using an object for this, looping with a for in is much faster for this case than using an array or accessing object keys
        this.timeframes = {
            "1m": 1000 * 60,
            "5m": 1000 * 60 * 5,
            "15m": 1000 * 60 * 15,
            "30m": 1000 * 60 * 30,
            "1h": 1000 * 60 * 60,
            "1d": 1000 * 60 * 60 * 24
        }
    }

    _transform(rawKline, encoding, callback) {

        this.push(rawKline);

        const { pairId, base, quote, interval, openTime, closeTime, timestamp, open, high, low, close, baseVolume, quoteVolume, isFinal, raw } = rawKline;

        if (typeof this.kline[pairId] === "undefined")
            this.kline[pairId] = {};

        for (let timeframe in this.timeframes) {
            if (timeframe === interval)
                continue;

            const millis = this.timeframes[timeframe];

            const resampledOpenTime = Math.floor(openTime / millis) * millis;
            const resampledCloseTime = Math.ceil(openTime / millis) * millis - 1;

            if (typeof this.kline[pairId][timeframe] === "undefined") {
                this.kline[pairId][timeframe] = {};
            }

            if (typeof this.kline[pairId][timeframe][resampledOpenTime] === "undefined") {
                const newKline = {
                    pairId: pairId,
                    base: base,
                    quote: quote,
                    interval: timeframe,
                    openTime: resampledOpenTime,
                    closeTime: resampledCloseTime,
                    timestamp: timestamp,
                    open: open,
                    high: high,
                    low: low,
                    close: close,
                    baseVolume: baseVolume,
                    quoteVolume: quoteVolume,
                    isFinal: false,
                    raw: null
                }

                this.kline[pairId][timeframe][resampledOpenTime] = {
                    kline: newKline,
                    prevBaseVol: 0,
                    prevQuoteVol: 0
                }
            }
            else {
                const { kline, prevBaseVol, prevQuoteVol } = this.kline[pairId][timeframe][resampledOpenTime];
                kline.high = high > kline.high ? high : kline.high;
                kline.low = low < kline.low ? low : kline.low;
                kline.close = close;
                kline.baseVolume = parseFloat((prevBaseVol + baseVolume).toFixed(PRECISION));
                kline.quoteVolume = parseFloat((prevQuoteVol + quoteVolume).toFixed(PRECISION));
            }

            if (isFinal) {
                const { prevBaseVol, prevQuoteVol } = this.kline[pairId][timeframe][resampledOpenTime];
                this.kline[pairId][timeframe][resampledOpenTime].prevBaseVol = parseFloat((prevBaseVol + baseVolume).toFixed(PRECISION));
                this.kline[pairId][timeframe][resampledOpenTime].prevQuoteVol = parseFloat((prevQuoteVol + quoteVolume).toFixed(PRECISION));
            }

            let keys = Object.keys(this.kline[pairId][timeframe]);

            if (keys.length > 1) {
                let previousTimestamp = keys.shift();
                this.kline[pairId][timeframe][previousTimestamp]['kline'].isFinal = true;
                this.push(this.kline[pairId][timeframe][previousTimestamp].kline);
                delete this.kline[pairId][timeframe][previousTimestamp];
                console.log(Object.keys(this.kline[pairId][timeframe]).length, Object.keys(this.kline[pairId]).length)
            }

            this.push(this.kline[pairId][timeframe][resampledOpenTime].kline);
        }

        callback()
    }
}

Я новичок в потоках, и некоторые идеи будут очень полезны, если я создам несколько экземпляров TransformStream, по одному для каждого таймфрейма

...