Что я делаю не так?
Пожалуйста, проверьте подписи операторов, чтобы использовать правильные типы: https://github.com/ReactiveX/RxJava#base-class-vs-base-type
JavaDoc :
public final Single<T> retryWhen(
Function<? super Flowable<Throwable>,? extends Publisher<?>> handler)
Поскольку этот учебник использует Observable.timer без проблем.
Этот учебник предшествует RxJava 2. На самом деле, Javadoc, связанный выше, содержитНапример, с Flowable.timer()
:
Single.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
// vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
return Flowable.timer(counter.get(), TimeUnit.SECONDS);
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
});
})
.blockingGet();
это что-то, связанное только с Single
retryWhen
и repeatWhen
, использующим Publisher
в качестве сигнала повтораспроектировать так, чтобы мы могли использовать противодавление, чтобы запрашивать только один такой сигнал повтора за раз.С Observable
есть вероятность, что обработчик просто сбросит много сигналов одновременно, и операторы могут вести себя неожиданно.