Не кеширование сигналов об ошибках в Mono.cache () - PullRequest
1 голос
/ 02 июля 2019

Здравствуйте, хорошие люди из реактора - я пытался написать какой-нибудь реактивный код (удивительно, а?) И наткнулся на небольшую загадку. Я думаю, что может быть ошибкой реактора, но я решил сначала спросить здесь перед публикацией ошибки.

Для контекста: у меня есть кеш Map<Key, Mono<Value>>. Клиент будет запрашивать данные - мы проверяем кеш и используем то, что по существу computeIfAbsent, чтобы поместить Mono с .cache() в кеш, если для этого ключа еще ничего не было кешировано. Затем клиент берет Mono и делает магию (здесь не актуально). Теперь выгода заключается в том, что в заполнении кеша могут возникать временные ошибки, поэтому мы не хотим кешировать ошибки - текущий запрос будет ошибочным, но «следующий» клиент при подписке должен запустить весь конвейер повторно.

Прочитав, например, этот закрытый вопрос , я остановился на Mono#cache(ttlForValue, ttlForError, ttlForEmpty).

Здесь все становится интереснее.

Поскольку я не хочу кэшировать error (или empty, но игнорирую это) сигналы, я нашел следующую документацию многообещающей

Если соответствующий генератор TTL сгенерирует какое-либо значение Exception, это исключение будет распространено на подписчика, который обнаружил пропадание кэша, но кэш будет немедленно очищен , поэтому другие подписчики могут повторно заполнить кеш на случай, если ошибка была временной. Если источник отправил ошибку, эта ошибка удаляется и добавляется как исключенное исключение. В случае, если источник испускает значение, это значение сбрасывается.

упорная мина

Поэтому я попробовал следующее (бесстыдно набросав пример в связанной проблеме GitHub)

public class TestBench {

   public static void main(String[] args) throws Exception {
       var sampleService = new SampleService();
       var producer = Mono.fromSupplier(sampleService::call).cache(
               __ -> Duration.ofHours(24),
               //don't cache errors
               e -> {throw Exceptions.propagate(e);},
               //meh
               () -> {throw new RuntimeException();});
       try {
           producer.block();
       } catch (RuntimeException e) {
           System.out.println("Caught exception : " + e);
       }
       sampleService.serverAvailable = true;
       var result = producer.block();
       System.out.println(result);
   }

   static final class SampleService {
       volatile boolean serverAvailable = false;

       String call() {
           System.out.println("Calling service with availability: " + serverAvailable);
           if (!serverAvailable) throw new RuntimeException("Error");
           return "Success";
       }
   }
}

выход

09:12:23.991 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Calling service with availability: false
09:12:24.034 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
java.lang.RuntimeException: Error
   at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
   at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
   at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
   at reactor.core.publisher.Mono.block(Mono.java:1474)
   at uk.co.borismorris..boris.testbench.TestBench.main(TestBench.java:26)
Caught exception : reactor.core.Exceptions$BubblingException: java.lang.RuntimeException: Error
Exception in thread "main" java.lang.RuntimeException: Error
   at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
   at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
   at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
   at reactor.core.publisher.Mono.block(Mono.java:1474)
   at uk.co.borismorris.testbench.TestBench.main(TestBench.java:26)
   Suppressed: java.lang.Exception: #block terminated with an error
       at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
       at reactor.core.publisher.Mono.block(Mono.java:1475)
       at uk.co.borismorris.testbench.TestBench.main(TestBench.java:31)

Ну, это не сработало - ошибка кэшируется, и второй подписчик просто видит ту же ошибку.

Глядя на код причина очевидна

Duration ttl = null;
try {
   ttl = main.ttlGenerator.apply(signal);
}
catch (Throwable generatorError) {
   signalToPropagate = Signal.error(generatorError);
   STATE.set(main, signalToPropagate); //HERE
   if (signal.isOnError()) {
       //noinspection ThrowableNotThrown
       Exceptions.addSuppressed(generatorError, signal.getThrowable());
   }
}

STATE установлен на сигнал error, вообще не очищается. Но это не вся история, причина, по которой код не очищает кэш, находится ниже этого блока

if (ttl != null) {
   main.clock.schedule(main, ttl.toMillis(), TimeUnit.MILLISECONDS);
}
else {
   //error during TTL generation, signal != updatedSignal, aka dropped
   if (signal.isOnNext()) {
       Operators.onNextDropped(signal.get(), currentContext());
   }
   else if (signal.isOnError()) {
       Operators.onErrorDropped(signal.getThrowable(), currentContext());
   }
   //immediate cache clear
   main.run();
}

В этом случае ttl == null, потому что поколение ttl бросило Exception. signal - это error, так что ветвь вводится и Operators.onErrorDropped называется

public static void onErrorDropped(Throwable e, Context context) {
   Consumer<? super Throwable> hook = context.getOrDefault(Hooks.KEY_ON_ERROR_DROPPED,null);
   if (hook == null) {
       hook = Hooks.onErrorDroppedHook;
   }
   if (hook == null) {
       log.error("Operator called default onErrorDropped", e);
       throw Exceptions.bubble(e);
   }
   hook.accept(e);
}

Итак, здесь мы можем видеть, что если в контексте нет хука onError и нет набора по умолчанию, то вызывается throw Exceptions.bubble(e) и код в MonoCacheTime возвращается рано, не вызывая main.run(). Следовательно, ошибка остается кэшированной неопределенно , поскольку TTL не существует!

Следующий код решает эту проблему

public class TestBench {

    private static final Logger logger = LoggerFactory.getLogger(TestBench.class);
    private static final Consumer<Throwable> onErrorDropped = e -> logger.error("Dropped", e);

    static {
        //add default hook
        Hooks.onErrorDropped(onErrorDropped);
    }

    public static void main(String[] args) throws Exception {
        var sampleService = new SampleService();
        var producer = Mono.fromSupplier(sampleService::call).cache(
                __ -> Duration.ofHours(24),
                //don't cache errors
                e -> {throw Exceptions.propagate(e);},
                //meh
                () -> {throw new RuntimeException();});
        try {
            producer.block();
        } catch (RuntimeException e) {
            System.out.println("Caught exception : " + e);
        }
        sampleService.serverAvailable = true;
        var result = producer.block();
        System.out.println(result);
    }

    static final class SampleService {
        volatile boolean serverAvailable = false;

        String call() {
            System.out.println("Calling service with availability: " + serverAvailable);
            if (!serverAvailable) throw new RuntimeException("Error");
            return "Success";
        }
    }

}

Но добавляет глобальный хук, который не идеален. Код намекает на возможность добавления хуков для каждого конвейера, но я не могу понять, как это сделать. Следующие работы, но, очевидно, взломать

.subscriberContext(ctx -> ctx.put("reactor.onErrorDropped.local", onErrorDropped))

Вопросы

  1. Является ли приведенная выше ошибка ошибкой, если отсутствие хука onErrorDropped приводит к бесконечному кешированию ошибок?
  2. Есть ли способ установить хук onErrorDropped в subscriberContext, а не глобально?

Последующие действия

из кода; кажется, что возврат null из функции генератора TTL поддерживается и имеет то же поведение, когда сигнал немедленно очищается. В случае, если это не так, абонент видит исходную ошибку, а не ошибку генератора TTL, и скрытую ошибку, которая, возможно, выглядит точнее

   public static void main(String[] args) throws Exception {
   var sampleService = new SampleService();
   var producer = Mono.fromSupplier(sampleService::call).cache(
           __ -> Duration.ofHours(24),
           //don't cache errors
           e -> null,
           //meh
           () -> null);
   try {
       producer.block();
   } catch (RuntimeException e) {
       System.out.println("Caught exception : " + e);
   }
   sampleService.serverAvailable = true;
   var result = producer.block();
   System.out.println(result);
}

Поддерживается ли это поведение? Должно ли это быть документировано?

1 Ответ

1 голос
/ 03 июля 2019

Вы действительно нашли ошибку! И я думаю, что документация также может быть улучшена для этого варианта cache:

  1. Внимание к тому, как оно работает с исключениями внутри TTL Function, вероятно, вводит в заблуждение
  2. Должен существовать документированный простой способ «игнорирования» категории сигналов в источнике (на ваш случай: вы хотите, чтобы последующие подписчики «повторяли» при возникновении ошибки источника).
  3. Поведение содержит ошибки из-за использования onErrorDropped (по умолчанию выбрасывается исключенное исключение, что предотвращает сброс состояния main.run()).

К сожалению, в тестах используется StepVerifier#verifyThenAssertThat(), который устанавливает хук onErrorDropped, так что последняя ошибка никогда не была идентифицирована.

Возвращение null в функции TTL работает не лучше, потому что происходит та же ошибка, но на этот раз с исключением / всплытием исходного исключения источника.

Но существует идеальная семантика для передачи ошибки первому подписчику, и пусть второй подписчик повторяет попытку: , чтобы вернуть Duration.ZERO в ttl Function. Это недокументировано, но работает прямо сейчас:

IllegalStateException exception = new IllegalStateException("boom");
AtomicInteger count = new AtomicInteger();

Mono<Integer> source = Mono.fromCallable(() -> {
    int c = count.incrementAndGet();
    if (c == 1) throw exception;
    return c;
});

Mono<Integer> cache = source.cache(v -> Duration.ofSeconds(10),
    e -> Duration.ZERO,
    () -> Duration.ofSeconds(10));

assertThat(cache.retry().block()).isEqualTo(2);

Я открою проблему, чтобы исправить ошибку сброса состояния и сосредоточу внимание на javadoc на приведенном выше решении, в то же время перенеся бит, связанный с выбрасыванием функций TTL, в отдельный короткий параграф в конце.

редактировать: https://github.com/reactor/reactor-core/issues/1783

...