Допустим, у меня есть источник данных, генерирующий события, который я хочу преобразовать в реактивный поток.Источник данных связан ресурсом (например, сокетом, который периодически отправляет обновленное состояние), поэтому я хотел бы поделиться одной подпиской на этот ресурс.Использование единственной наблюдаемой с replay
(для новых подписчиков, чтобы немедленно получить текущее значение) и refCount
операторов, кажется, хорошо подходит для этого.Например, вот как выглядит его MyDataProvider
singleton:
private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
// Open my resource here and emit data into observable
})
.doOnDispose(() -> {
// Close my resource here
})
.replay(1)
.refCount();
public Observable<MyData> getMyDataObservable() {
return myDataObservable;
}
Однако, теперь давайте предположим, что у меня есть другой источник данных, которому нужен результат первого источника данных для вычисления его собственного значения:
private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
.flatMap(myData -> {
// Call another data source and return the result here
})
public Observable<AnotherData> getAnotherDataObservable() {
return anotherDataObservable;
}
Здесь моя установка начинает разваливаться.Многоадресная рассылка первой наблюдаемой работает только до оператора refCount
.После этого все снова одноадресно.Это будет означать, что, если будут сделаны две отдельные подписки на anotherDataProvider
, оператор flatMap
будет вызван дважды.Я вижу два обходных пути для этого, но мне не нравятся оба:
1.Преобразование сначала можно наблюдать, прежде чем произойдет многоадресная рассылка
Похоже, для меня проще всего сохранить где-то одноадресный вариант myDataObservable
до выполнения многоадресной операции, а затем выполнить эту операцию многоадресной рассылки в anotherDataObservable
Однако, если эти две наблюдаемыеРасположенный в разных модулях, этот обходной путь сделает код очень не элегантным, требуя MyDataProvider
для отображения двух разных наблюдаемых, которые, по-видимому, возвращают одни и те же данные.
2.Просто используйте дублирующие многоадресные операторы
Второй обходной путь, по-видимому, состоит в том, чтобы просто применить эти replay
и refCount
операторы снова в anotherDataObservable
.Но это создает неэффективность, поскольку первый оператор многоадресной рассылки в myDataObservable
уже применяется, но теперь ничего не делает, кроме пустой траты памяти и циклов ЦП.
Оба обходных пути также включают в себя соединение AnotherDataProvider
с MyDataProvider
.Если в будущем MyDataProvider
изменения и многоадресная рассылка больше не требуются, мне также придется обновить AnotherDataProvider
, чтобы удалить оттуда операторы многоадресной рассылки.
Что было бы более элегантным способом решения этой проблемы?Могу ли я понять, что лучше вообще избежать этой проблемы?