У меня есть наблюдаемое, которое передает ответы на все запросы. Я хочу создать фильтр этого наблюдаемого при выполнении запроса, чтобы я мог поделиться результатами с несколькими подписчиками. Ниже приведен пример кода.
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.doOnSubscribe(disposable -> {
publishSubject.onNext("foobar");
})
.replay(1)
.refCount();
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
Я использую PublishSubject в качестве фиктивного сервиса, потому что иногда сервис немедленно возвращает ответ.
Я обнаружил, что, потому что там нет текущей подписки, когда есть немедленный результат, мой fooObservable не заполняется. т.е. я не получаю вывод журнала, когда хочу увидеть:
A val : <foobar>
Обратите внимание, что я получаю тот же результат с этим кодом:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.replay(1)
.refCount();
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
Так что проблема в том, что fooObservable не подписаться на PublishSubject до тех пор, пока он не будет подписан,
Есть ли способ запустить код сразу после первой подписки на fooObservable?
Редактировать: Я думал о чем-то вроде:
PublishSubject<String> publishSubject = PublishSubject.create();
BehaviorSubject<String> fooObservable = BehaviorSubject.create();
publishSubject.filter(value -> value.startsWith("foo")).subscribe(fooObservable);
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
Но тогда у меня есть 2 подписки, и я не уверен, как выполнить очистку, так как подписка после того, как фильтр не возвращает одноразовые.
Редактировать 2: Описание фоновое задание.
У меня есть сторонняя служба, на которую мой код должен подписаться. Этот сервис вызывает метод onResponse в моем коде с параметром, содержащим мой исходный запрос и ответ. Ответ может быть обновлен новым вызовом onResponse в любое время.
Я хочу обернуть, чтобы создать оболочку для этой службы, которая предоставляет метод:
public Observable<Response> getObservable(Request req);
Если запрос соответствует тот, который уже подписан, тогда наблюдаемое должно предоставить самое последнее совпадающее значение сразу при подписке.
Когда нет подписчиков, мне нужно отписаться от службы, которую я упаковываю.