Я не могу обернуть голову вокруг правильного синтаксиса, чтобы выполнить следующее:
- Создать Hot Observable, который выполняется только один раз и никогда не вызывает onComplete
- Имеют разные Observables слушать егои реагировать на данные, передаваемые из горячей наблюдаемой, не заставляя горячую наблюдаемую вызвать onComplete.
Ниже приведен упрощенный пример:
// Create an observable that never calls onComplete and keeps spitting out data
Observable<Foo> hotObservable = Observable.create(emitter -> {
while (true) {
Foo someData = listenToInputStream();
emitter.onNext(someData);
}
}
// Ensure observable is fired off immediately
ConnectableObservable cObs = hotObservable
.subscribeOn(Schedulers.io())
.publish();
cObs.connect();
Single<Foo> obs1 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs1EventToBeSentToInputStream)
.filter(someFilter)
.singleOrError();
obs1.subscribe(someConsumer);
Single<Bar> obs2 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs2EventToBeSentToInputStream)
.filter(someOtherFilter)
.map(fooToBar)
.singleOrError();
obs2.subscribe(someOtherConsumer);
Я вижу, что firstOrError ()работает, но не singleOrError () / lastOrError () /. takeLast (1). Есть ли способ получить последнюю версию, которая соответствует критериям фильтра, без блокировки / зависания?
FWIW, если я делаю .take (1) .singleOrError (), он проходит, но я предполагаю, что это то же самое, что firstOrError (),Я ищу самые последние отправленные данные, которые соответствуют этому фильтру наблюдателей.
У меня также есть другие наблюдатели, которые прослушивают любое количество / типы данных, генерируемых из горячей наблюдаемой, поэтому я вызываю doOnSubscribe дляэти конкретные наблюдатели вместо того, чтобы интегрировать входной поток непосредственно в самих наблюдателей.