Как правильно преобразовать многоадресные наблюдаемые в RxJava - PullRequest
0 голосов
/ 18 мая 2018

Допустим, у меня есть источник данных, генерирующий события, который я хочу преобразовать в реактивный поток.Источник данных связан ресурсом (например, сокетом, который периодически отправляет обновленное состояние), поэтому я хотел бы поделиться одной подпиской на этот ресурс.Использование единственной наблюдаемой с replay (для новых подписчиков, чтобы немедленно получить текущее значение) и refCount операторов, кажется, хорошо подходит для этого.Например, вот как выглядит его MyDataProvider singleton:

private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
    // Open my resource here and emit data into observable
})
    .doOnDispose(() -> {
        // Close my resource here
    })
    .replay(1)
    .refCount();

public Observable<MyData> getMyDataObservable() {
    return myDataObservable;
}

Однако, теперь давайте предположим, что у меня есть другой источник данных, которому нужен результат первого источника данных для вычисления его собственного значения:

private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
    .flatMap(myData -> {
        // Call another data source and return the result here
    })

public Observable<AnotherData> getAnotherDataObservable() {
    return anotherDataObservable;
}

Здесь моя установка начинает разваливаться.Многоадресная рассылка первой наблюдаемой работает только до оператора refCount.После этого все снова одноадресно.Это будет означать, что, если будут сделаны две отдельные подписки на anotherDataProvider, оператор flatMap будет вызван дважды.Я вижу два обходных пути для этого, но мне не нравятся оба:

1.Преобразование сначала можно наблюдать, прежде чем произойдет многоадресная рассылка

Похоже, для меня проще всего сохранить где-то одноадресный вариант myDataObservable до выполнения многоадресной операции, а затем выполнить эту операцию многоадресной рассылки в anotherDataObservable Однако, если эти две наблюдаемыеРасположенный в разных модулях, этот обходной путь сделает код очень не элегантным, требуя MyDataProvider для отображения двух разных наблюдаемых, которые, по-видимому, возвращают одни и те же данные.

2.Просто используйте дублирующие многоадресные операторы

Второй обходной путь, по-видимому, состоит в том, чтобы просто применить эти replay и refCount операторы снова в anotherDataObservable.Но это создает неэффективность, поскольку первый оператор многоадресной рассылки в myDataObservable уже применяется, но теперь ничего не делает, кроме пустой траты памяти и циклов ЦП.

Оба обходных пути также включают в себя соединение AnotherDataProvider с MyDataProvider.Если в будущем MyDataProvider изменения и многоадресная рассылка больше не требуются, мне также придется обновить AnotherDataProvider, чтобы удалить оттуда операторы многоадресной рассылки.

Что было бы более элегантным способом решения этой проблемы?Могу ли я понять, что лучше вообще избежать этой проблемы?

Ответы [ 3 ]

0 голосов
/ 01 июня 2018

Что касается вашего первого подхода, в текущей настройке ваш anotherDataObservable использует myDataObservable, и, насколько я понимаю, они логически связаны, поскольку используют один и тот же источник.Таким образом, вам нужно иметь некоторую базовую общую логику для них.Я бы распаковал его в общий модуль, который представит одноадресную версию наблюдаемой, а затем заставит myDataObservable и anotherDataObservable использовать ее в разных модулях, добавляя логику многоадресной рассылки.

Другой вариант - иметь класс, который будет отслеживать ваш ресурс, подписавшись на него, как в myDataObservable, выполнив обработку в onNext и опубликовав сопоставленный результат с Subject т. е. BehavioralSubject , если вы хотите всегда иметь доступ к последнему опубликованному значению и необработанному результату с другим субъектом.Клиенты будут подписываться на эти темы и получат сопоставленные или необработанные значения, которые были рассчитаны только один раз в классе мониторинга.

PS не забудьте добавить стратегию противодавления к вашему субъекту, прежде чем подписаться на него.

Если эти варианты вам не подходят, подумайте, действительно ли важно избегать многократного вызова flatMap?Ваш код довольно прост, и это важный показатель.Если flatMap не тяжелый, вы можете просто запустить его несколько раз.

0 голосов
/ 05 июня 2018

Вы можете разделить одноадресные и многоадресные потоки, но это избыточно.Я думаю, что второй подход лучше, и, кстати, операторы replay и refcount на самом деле делают вещи и не являются пустой тратой.

Вы конвертируете Observable из myDataObservable в ConenctableObservable при вызове replay(1), включающем многоадресную рассылку.
Затем вы подписываетесь на него внутренне, используя refcount(), что также обеспечиваетединая точка для последующих подписок;после этого все снова становится одноадресным.

То, чего вы действительно хотите достичь в anotherDataObservable, такое же, как и в myDataObservable.

0 голосов
/ 19 мая 2018

Вы можете использовать тандем «publish (). RefCount ()», чтобы разрешить совместное использование одного подписчика.Поскольку они используются очень часто, они имеют псевдоним share ().

Вы также можете использовать ConnectableObservable.Но будьте осторожны при использовании воспроизведения с ConnectableObservables.

Если вы примените оператор Replay к Observable до того, как преобразуете его в подключаемую Observable, результирующая подключаемая Observable всегда будет отправлять одну и ту же полную последовательность в любое будущеенаблюдатели, даже те наблюдатели, которые подписываются после того, как подключаемая Наблюдаемая начинает передавать объекты другим подписанным наблюдателям.В документах говорится:

...