Как избирательно испускать горячие (общие) наблюдаемые в RxJS? - PullRequest
0 голосов
/ 19 сентября 2018

В моем коде у меня есть горячая наблюдаемая (холодная наблюдаемая по каналу через оператор share ()), генерирующая значения с идентификаторами (т.е. тип полезной нагрузки содержит строковое свойство "id").Затем группа наблюдателей отфильтровывает излучаемые значения на основе указанного идентификатора.

Теперь я думаю, что мой код сильно пострадал от производительности, поскольку каждый наблюдатель должен фильтровать каждое полученное значение.Или, скорее, новая отфильтрованная наблюдаемая создается каждый раз, когда мне нужен новый наблюдатель, чтобы подписаться на общую наблюдаемую.Вид кода выглядит следующим образом:

const source = new Observable();
const sharedObservable = source.pipe(share());
sharedObservable
    .pipe(filter(payload => payload.id === "id1"))
    .subscribe(observerOne);
sharedObservable
    .pipe(filter(payload => payload.id === "id2"))
    .subscribe(observerTwo);
// etc.

Мое первоначальное решение - создать собственный тип наблюдаемой в горячем / общем виде, который обрабатывает фильтрацию на основе внутренней карты от идентификаторов до списков наблюдателей, которые должны получитьПолезная нагрузка.Это уменьшило бы стоимость запуска некоторой функции фильтра предикатов для каждого наблюдателя, который подписывается.Под наблюдаемым будет что-то похожее (или неопределенно похожее) на это:

class FilteredHotObservable {
    constructor(sourceObservable) {
        this.observerMap = new Map();
        this.source = sourceObservable;
        this.source.subscribe(payload => {
            const observers = this.observerMap.get(payload.id);
            observers.forEach(observer => observer.next(payload));
        });
    }

    subscribe(observer, desiredIdToFilterBy) {
        const observers = this.observerMap.get(desiredIdToFilterBy);
        observers.push(observer);
    }
}

Прежде чем я пойду и реализую это, мне было интересно, есть ли уже правильный способ обработки этого сценария.Другими словами, есть ли в RxJS механизм (например, специальный субъект, причудливый оператор) для облегчения такого рода масштабируемых «фильтруемых и общих» наблюдаемых?

ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: Имена и синтаксисфрагменты кода выше не соответствуют тому, что я на самом деле делаю.Ради этого поста они всего лишь примеры.

Редактировать: Вот фактические цифры, касающиеся задержки обработки выбросов при использовании оператора filter.Для «PostFilterLatency» испускаемые полезные нагрузки фильтруются по идентификаторам, которые соответствуют отдельным наблюдателям в общей наблюдаемой.Для «PreFilterLatency» выбросы фильтруются вручную через карту id-> Observer, затем впоследствии вызывается Observer.next () с соответствующей полезной нагрузкой.Числа - это время, которое берется с момента, когда полезные данные были отправлены, когда они обрабатывались соответствующими наблюдателями (время в миллисекундах).

ObserverCount  PostFilterLatency  PreFilterLatency
-------------  -----------------  ----------------
1              0                  0
10             0                  0
100            0.03               0
1000           0.039              0.001
10000          1.1849             0.0005
100000         19.90902           0.002

Редактировать: вот ссылка накод, который я использовал для запуска этих тестов.

...