Как объединить массив Observables с RxJS 6.x и Node.js? - PullRequest
1 голос
/ 22 апреля 2019

В целях обучения я создаю приложение Node, которое должно взять x наблюдаемых RxJS из массива и объединить их в единый поток событий.Я хочу знать, когда события происходят в любом наблюдаемом, в любом порядке (не в какой-либо последовательности или полном завершении).Я чувствую, что это должно быть в едином объединенном потоке событий.По сути, первое событие, которое приходит из любой наблюдаемой, завершится.

Для этого я чувствовал, что слияние () добьется цели.Поскольку слияние не принимает массивы непосредственно в качестве параметра, я пока использую Reduce, чтобы помочь слиянию.

Однако конечный результат - это не наблюдаемая, а функция.Я не могу подписаться на это тоже.Упрощенную версию кода можно увидеть ниже.

Как я могу изменить этот узел 10.14.2, код RxJS 6.4.x, чтобы он возвращал наблюдаемую, а не "[функцию]", к которой я могу прикрепить .subscribe ()?

const { Observable } = require('rxjs');
const { merge } = require('rxjs/operators');

const observables = [
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello'))
];

const mergedObservables = observables.reduce((merged, observable) => {
    console.log(observable);
    return merge(merged, observable);
});

// outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }

console.log(mergedObservables);

// outputs:
// [Function]

mergedObservables.subscribe();
// error:
// TypeError: mergedObservables.subscribe is not a function

1 Ответ

1 голос
/ 22 апреля 2019

EDIT: вы импортируете оператор merge вместо статической функции merge.Первый оперирует событиями из наблюдаемого источника, а второй создает новую наблюдаемую из одной или нескольких наблюдаемых источников.Хотя мысль о синтаксисе распространения ниже упрощает ваш код, это не было вашей реальной проблемой.


Похоже, Node.js> = 5.0 поддерживает оператор распространения для массивов ввызовы функций (вы не указываете, какую версию Node.js вы используете).Следующее должно работать, если вы используете современную версию Node.js:

const { Observable, merge } = require('rxjs')

const observables = [
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello'))
]

const mergedObservables = merge(...observables)
mergedObservables.subscribe(event => { console.log(event) })
...