rxjava завершено после повторной попытки при завершении - PullRequest
0 голосов
/ 25 февраля 2019

Я использую оператор retryWhen в Completeable, есть ли способ сказать ему, чтобы завершить из Flowry retry?как то так -

PublishSubject<?> retrySubject = PublishSubject.create();
public void someFunction() {
    someCompletable.retryWhen(new Function<Flowable<Throwable>, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.flatMap(throwable -> retrySubject.toFlowable(BackpressureStrategy.MISSING));
        }
    }).subscribe();
}

public void ignoreError(){
    retrySubject.onComplete();
}

1 Ответ

0 голосов
/ 25 февраля 2019

Вы не можете остановить flatMap, предоставив ему пустой источник.Также каждая ошибка будет продолжать подписывать все больше и больше наблюдателей на эту тему, что вызывает утечку памяти.

Используйте takeUntil, чтобы остановить последовательность с помощью другого источника:

PublishProcessor<Throwable> stopProcessor = PublishProcessor.create();

source.retryWhen(errors -> 
    errors.takeUntil(
        stopProcessor
    )
    .flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS))
)

stopProcessor.onComplete();

Редактировать Если вы хотите повторно использовать ту же тему, вы можете подавить элементы на пути остановки:

PublishProcessor<Integer> stopProcessor = PublishProcessor.create();

source.retryWhen(errors -> 
    errors.takeUntil(
        stopProcessor.ignoreElements().toFlowable()
    )
    .flatMap(error -> stopProcessor)
)

// retry
stopProcessor.onNext(1);

// stop
stopProcessor.onComplete();
...