Rx Java retryWhen (экспоненциальный откат) не работает - PullRequest
4 голосов
/ 13 февраля 2020

Итак, я знаю, что об этом уже много раз спрашивали, но я пробовал много вещей, и, похоже, ничего не работает.

Давайте начнем с этих блогов / статей / кода:

и многие другие.

В двух словах все они описывают, как вы можете использовать Повторите попытку реализации экспоненциального отката. Примерно так:

source
.retryWhen(
  errors -> {
    return errors
    .zipWith(Observable.range(1, 3), (n, i) -> i)
    .flatMap(
       retryCount -> {
       System.out.println("retry count " + retryCount);
       return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
     });
 })

Даже документация в библиотеке с этим согласна: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919.

Тем не менее, я попробовал это и некоторые довольно похожие варианты, не заслуживающие описания здесь, и, похоже, ничего не работает. Есть способ, которым примеры работают и используют блокировку подписчиков, но я хочу избежать блокирования потоков.

Так что если к предыдущему наблюдаемому объекту мы применяем блокирующего подписчика, как это:

.blockingForEach(System.out::println);

Работает как положено. Но так как это не идея. Если мы попытаемся:

.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));

Поток запускается только один раз, и это не то, чего я хочу достичь.

Означает ли это, что его нельзя использовать так, как я пытаюсь? Судя по документации, это не проблема для выполнения sh моего требования.

Есть идеи, что мне не хватает?

TIA.

Редактировать: Я тестирую это двумя способами:

Метод тестирования (с использованием testng):

Observable<Integer> source =
    Observable.just("test")
        .map(
            x -> {
              System.out.println("trying again");
              return Integer.parseInt(x);
            });
source
    .retryWhen(
        errors -> {
          return errors
              .zipWith(Observable.range(1, 3), (n, i) -> i)
              .flatMap(
                  retryCount -> {
                    return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
                  });
        })
    .subscribe(...);

От пользователя Kafka (используя загрузку Spring):

Это это только подписка для наблюдателя, но logics c - это то, что я описал ранее в посте.

@KafkaListener(topics = "${kafka.config.topic}")
      public void receive(String payload) {
        log.info("received payload='{}'", payload);
        service
            .updateMessage(payload)
            .subscribe(...)
            .dispose();
      }

1 Ответ

0 голосов
/ 17 апреля 2020

Основная проблема вашего кода заключается в том, что Observable.timer по умолчанию работает в планировщике вычислений . Это добавляет дополнительные усилия при попытке проверить поведение в тесте.

Вот некоторый код модульного тестирования, который проверяет, что ваш код повторной попытки действительно повторяется.

  • Он добавляет счетчик, просто так мы можем легко проверить, сколько вызовов произошло.
  • Он использует TestScheduler вместо планировщика вычислений, так что мы можем делать вид, что движемся во времени через advanceTimeBy .

    TestScheduler testScheduler = new TestScheduler();
    AtomicInteger counter = new AtomicInteger();
    
    Observable<Integer> source =
        Observable.just("test")
            .map(
                x -> {
                    System.out.println("trying again");
                    counter.getAndIncrement();
                    return Integer.parseInt(x);
                });
    TestObserver<Integer> testObserver = source
        .retryWhen(
            errors -> {
                return errors
                    .zipWith(Observable.range(1, 3), (n, i) -> i)
                    .flatMap(
                        retryCount -> {
                            return Observable.timer((long) Math.pow(1, retryCount), SECONDS, testScheduler);
                        });
            })
        .test();
    
    assertEquals(1, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(2, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(3, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(4, counter.get());
    
    testObserver.assertComplete();
    
...