Подписка на Hot Observable без дополнений - PullRequest
1 голос
/ 31 октября 2019

Я не могу обернуть голову вокруг правильного синтаксиса, чтобы выполнить следующее:

  1. Создать Hot Observable, который выполняется только один раз и никогда не вызывает onComplete
  2. Имеют разные Observables слушать егои реагировать на данные, передаваемые из горячей наблюдаемой, не заставляя горячую наблюдаемую вызвать onComplete.

Ниже приведен упрощенный пример:

// Create an observable that never calls onComplete and keeps spitting out data
Observable<Foo> hotObservable = Observable.create(emitter -> {
   while (true) {
     Foo someData = listenToInputStream();
     emitter.onNext(someData);
   }
}

// Ensure observable is fired off immediately
ConnectableObservable cObs = hotObservable
  .subscribeOn(Schedulers.io())
  .publish();
cObs.connect();

Single<Foo> obs1 = cObs
 .subscribeOn(Schedulers.io())
 .doOnSubscribe(x -> triggerObs1EventToBeSentToInputStream)
 .filter(someFilter)
 .singleOrError();
obs1.subscribe(someConsumer);

Single<Bar> obs2 = cObs
 .subscribeOn(Schedulers.io())
 .doOnSubscribe(x -> triggerObs2EventToBeSentToInputStream)
 .filter(someOtherFilter)
 .map(fooToBar)
 .singleOrError();
obs2.subscribe(someOtherConsumer);

Я вижу, что firstOrError ()работает, но не singleOrError () / lastOrError () /. takeLast (1). Есть ли способ получить последнюю версию, которая соответствует критериям фильтра, без блокировки / зависания?

FWIW, если я делаю .take (1) .singleOrError (), он проходит, но я предполагаю, что это то же самое, что firstOrError (),Я ищу самые последние отправленные данные, которые соответствуют этому фильтру наблюдателей.

У меня также есть другие наблюдатели, которые прослушивают любое количество / типы данных, генерируемых из горячей наблюдаемой, поэтому я вызываю doOnSubscribe дляэти конкретные наблюдатели вместо того, чтобы интегрировать входной поток непосредственно в самих наблюдателей.

1 Ответ

0 голосов
/ 01 ноября 2019

Исходя из идеи субъекта в скайнетах, я думаю, что это работает, если я использую PublishSubject в качестве посредника. В настоящее время это работает как take (), но я думаю, я мог бы расширить это, чтобы сделать его более гибким и вернуть 1 к N элементам.

Пример:

PublishSubject<Foo> pSubj = PublishSubject.create();
cObjs
.filter(getCorrectData)
.doOnSubscribe(x -> triggerEventToBeSentToInputStream)
.subscribe(x -> {
  pSubj.onNext(x);
  pSubj.complete();
});

Single<Foo> obs1 = pSubj
 .subscribeOn(Schedulers.io())
 //etc 
...