Могу ли я использовать retryWhen и вернуть наблюдаемое при достижении предела? - PullRequest
1 голос
/ 30 июня 2019

Я пытаюсь повторить попытку с Java Rx (версия 1).

Я хочу сделать retryWhen вместо простого retry(), потому что я хочу, чтобы при достижении лимита возвращалось наблюдаемое с определенным значением, а не просто создавалось исключение.

Итак, проверяя это https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/ и это Ошибка перехвата, если retryWhen: s повторных попыток истекает Мне удалось создать что-то, что поможет моей цели.

// this is only to simulate the real method that will possibly throw an exception
public static Observable<String> test() {
    Observable<String> var = Observable.error(new IOException());
    return var;
}


Observable<String> test = test().retryWhen(attempts -> {
    return attempts.zipWith(Observable.range(1, 3), (throwable, attempt) -> {
        if (attempt == 3) {
            LOG.info("attempting");
            return Observable.just("completed with error");
        } else {
            return attempt;
        }
    });
});



test.doOnError(x -> System.out.println("do on error message")).subscribe(s -> {
    System.out.println(s);
});

когда я запускаю это локально, я вижу журнал попыток 3 раза (как и ожидалось).

Я не вижу println "do on error message" (как и ожидалось)

Но я не вижу completed with error, который я ожидал, заставляет меня сомневаться, что я действительно возвращаю наблюдаемое, хочу я или нет, что я делаю неправильно?

Я также не понимаю, почему это позволяет мне возвращать наблюдаемое и целое число внутри zipWith . Есть идеи?

и можно ли исключить исключение / ошибку из моего собственного наблюдаемого определения? как то так:

Observable<String> test = test().retry(3).map(value -> {
// some logic to define what to do
Observable.error(new Exception("error");
});

1 Ответ

1 голос
/ 01 июля 2019

Во-первых,

Я также не понимаю, почему это позволяет мне возвращать наблюдаемое и целое число внутри zipWith.

Подпись лямбды в zipWith - это (Throwable, Integer) -> Object, означающее, что что-либо является верным возвращением, поскольку оно является потомком Object.Это так, поскольку эта функция определяет, как объединить два объекта (в данном случае Throwable и Integer, и любая Object является допустимой комбинацией (или ее отсутствием).

Назадк вашей основной проблеме. Важно помнить, что на самом деле делает retryWhen. Это немного сложно (по крайней мере для меня) понять, но в основном всякий раз, когда наблюдатель в теле retryWhen испускает, что вызывает восходящий потокObservable подлежит повторной подписке. Это не контролирует эмиссию в нисходящем направлении.

Примеры из документов (фрагмент кода RxJava 2, но настроение все еще должно применяться) показывают это:

  Observable.create((ObservableEmitter<? super String> s) -> {
      System.out.println("subscribing");
      s.onError(new RuntimeException("always fails"));
  }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
          System.out.println("delay retry by " + i + " second(s)");
          return Observable.timer(i, TimeUnit.SECONDS);
      });
  }).blockingForEach(System.out::println);

В этом примере возврат в блоке retryWhen контролирует повторную подписку на исходный источник. В этом случае мы говорим, что хотим отложить повторную подписку на i секунд.

Имея это в виду, retryWhen может быть не тем решением, которое вы изначально искали. Другим решением может быть использование retry с тем, сколько раз вы захотите попробовать свою подпрограмму.или (retryWhen, если вы хотите что-то с более настраиваемой повторной подпиской), а затем используйте onErrorResumeNext.См. Также this .

В качестве примера:

Observable.create((ObservableEmitter<String> s) -> s.onError(new RuntimeException("always fails")))
        .retry(3)
        .onErrorResumeNext(throwable -> {
            return Observable.just("hi");
        })
        .subscribe(System.out::println, System.out::println);

В результате получается hi.Ключевым моментом здесь является то, что onErrorResumeNext позволяет нам превратить наши генерируемые исключения во что-то еще.Почти как map для исключений.

...