Вы не можете остановить flatMap
, предоставив ему пустой источник.Также каждая ошибка будет продолжать подписывать все больше и больше наблюдателей на эту тему, что вызывает утечку памяти.
Используйте takeUntil
, чтобы остановить последовательность с помощью другого источника:
PublishProcessor<Throwable> stopProcessor = PublishProcessor.create();
source.retryWhen(errors ->
errors.takeUntil(
stopProcessor
)
.flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS))
)
stopProcessor.onComplete();
Редактировать Если вы хотите повторно использовать ту же тему, вы можете подавить элементы на пути остановки:
PublishProcessor<Integer> stopProcessor = PublishProcessor.create();
source.retryWhen(errors ->
errors.takeUntil(
stopProcessor.ignoreElements().toFlowable()
)
.flatMap(error -> stopProcessor)
)
// retry
stopProcessor.onNext(1);
// stop
stopProcessor.onComplete();