Итак, я знаю, что об этом уже много раз спрашивали, но я пробовал много вещей, и, похоже, ничего не работает.
Давайте начнем с этих блогов / статей / кода:
и многие другие.
В двух словах все они описывают, как вы можете использовать Повторите попытку реализации экспоненциального отката. Примерно так:
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
System.out.println("retry count " + retryCount);
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
Даже документация в библиотеке с этим согласна: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919.
Тем не менее, я попробовал это и некоторые довольно похожие варианты, не заслуживающие описания здесь, и, похоже, ничего не работает. Есть способ, которым примеры работают и используют блокировку подписчиков, но я хочу избежать блокирования потоков.
Так что если к предыдущему наблюдаемому объекту мы применяем блокирующего подписчика, как это:
.blockingForEach(System.out::println);
Работает как положено. Но так как это не идея. Если мы попытаемся:
.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
Поток запускается только один раз, и это не то, чего я хочу достичь.
Означает ли это, что его нельзя использовать так, как я пытаюсь? Судя по документации, это не проблема для выполнения sh моего требования.
Есть идеи, что мне не хватает?
TIA.
Редактировать: Я тестирую это двумя способами:
Метод тестирования (с использованием testng):
Observable<Integer> source =
Observable.just("test")
.map(
x -> {
System.out.println("trying again");
return Integer.parseInt(x);
});
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
.subscribe(...);
От пользователя Kafka (используя загрузку Spring):
Это это только подписка для наблюдателя, но logics c - это то, что я описал ранее в посте.
@KafkaListener(topics = "${kafka.config.topic}")
public void receive(String payload) {
log.info("received payload='{}'", payload);
service
.updateMessage(payload)
.subscribe(...)
.dispose();
}