RxJava Merge Текучий и Завершаемый - PullRequest
0 голосов
/ 15 января 2019

У меня есть Flowable, который непрерывно испускает предметы и никогда не звонит onError или onComplete. Теперь у меня есть Completable, который я хотел бы объединить с этим Flowable, чтобы, когда Completable завершает Flowable, звонит onComplete. Я не могу напрямую изменить данный мне объект Flowable.

Одна проблема, с которой я столкнулся, заключается в том, что я бы использовал takeUntil на Flowable, однако Flowable может прекратить испускать предметы в любой точке, и я все же хотел бы, чтобы Completable мог звонить onComplete.

Обновление: Так как мы можем сделать Completable.toFlowable(), мы можем объединить два Flowables. Проблема в том, что я до сих пор не могу найти способ выполнить оба варианта после завершения.

Ответы [ 2 ]

0 голосов
/ 16 января 2019

Вот обобщенное решение. Как отмечается в ответе Максима, merge / mergeWith не выполняет мои функции, потому что для завершения требуются и Flowable, и Completable. Это решение также правильно утилизирует оба.

    Flowable<Integer> f = Flowable.fromArray(1, 2, 3);
    Completable c = Completable.complete();

    final PublishSubject<Integer> subject = PublishSubject.create();
    final CompositeDisposable cd = new CompositeDisposable();
    Flowable result = subject
            .doOnSubscribe(__ -> {
                cd.add(f.subscribe(subject::onNext));
                cd.add(c.subscribe(subject::onComplete));
            })
            .doOnDispose(cd::dispose)
            .toFlowable(BackpressureStrategy.LATEST);
0 голосов
/ 15 января 2019

Как вы указали, вы не можете изменить сам оригинал Flowable, поэтому он не будет излучать onComplete. Однако вы можете заставить полученный Flowable испустить его, выполнив следующее (псевдокод):

val f: Flowable = ...
val c: Completable = ...
val r: Flowable = f.materialize().mergeWith(c.materialize()).dematerialize()

...