Здравствуйте, хорошие люди из реактора - я пытался написать какой-нибудь реактивный код (удивительно, а?) И наткнулся на небольшую загадку. Я думаю, что может быть ошибкой реактора, но я решил сначала спросить здесь перед публикацией ошибки.
Для контекста: у меня есть кеш Map<Key, Mono<Value>>
. Клиент будет запрашивать данные - мы проверяем кеш и используем то, что по существу computeIfAbsent
, чтобы поместить Mono
с .cache()
в кеш, если для этого ключа еще ничего не было кешировано. Затем клиент берет Mono
и делает магию (здесь не актуально). Теперь выгода заключается в том, что в заполнении кеша могут возникать временные ошибки, поэтому мы не хотим кешировать ошибки - текущий запрос будет ошибочным, но «следующий» клиент при подписке должен запустить весь конвейер повторно.
Прочитав, например, этот закрытый вопрос , я остановился на Mono#cache(ttlForValue, ttlForError, ttlForEmpty)
.
Здесь все становится интереснее.
Поскольку я не хочу кэшировать error
(или empty
, но игнорирую это) сигналы, я нашел следующую документацию многообещающей
Если соответствующий генератор TTL сгенерирует какое-либо значение Exception
, это исключение будет распространено на подписчика, который обнаружил пропадание кэша, но кэш будет немедленно очищен , поэтому другие подписчики могут повторно заполнить кеш на случай, если ошибка была временной. Если источник отправил ошибку, эта ошибка удаляется и добавляется как исключенное исключение. В случае, если источник испускает значение, это значение сбрасывается.
упорная мина
Поэтому я попробовал следующее (бесстыдно набросав пример в связанной проблеме GitHub)
public class TestBench {
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> {throw Exceptions.propagate(e);},
//meh
() -> {throw new RuntimeException();});
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
static final class SampleService {
volatile boolean serverAvailable = false;
String call() {
System.out.println("Calling service with availability: " + serverAvailable);
if (!serverAvailable) throw new RuntimeException("Error");
return "Success";
}
}
}
выход
09:12:23.991 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Calling service with availability: false
09:12:24.034 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
java.lang.RuntimeException: Error
at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
at reactor.core.publisher.Mono.block(Mono.java:1474)
at uk.co.borismorris..boris.testbench.TestBench.main(TestBench.java:26)
Caught exception : reactor.core.Exceptions$BubblingException: java.lang.RuntimeException: Error
Exception in thread "main" java.lang.RuntimeException: Error
at uk.co.borismorris.testbench.TestBench$SampleService.call(TestBench.java:40)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56)
at reactor.core.publisher.MonoCacheTime.subscribe(MonoCacheTime.java:123)
at reactor.core.publisher.Mono.block(Mono.java:1474)
at uk.co.borismorris.testbench.TestBench.main(TestBench.java:26)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1475)
at uk.co.borismorris.testbench.TestBench.main(TestBench.java:31)
Ну, это не сработало - ошибка кэшируется, и второй подписчик просто видит ту же ошибку.
Глядя на код причина очевидна
Duration ttl = null;
try {
ttl = main.ttlGenerator.apply(signal);
}
catch (Throwable generatorError) {
signalToPropagate = Signal.error(generatorError);
STATE.set(main, signalToPropagate); //HERE
if (signal.isOnError()) {
//noinspection ThrowableNotThrown
Exceptions.addSuppressed(generatorError, signal.getThrowable());
}
}
STATE
установлен на сигнал error
, вообще не очищается. Но это не вся история,
причина, по которой код не очищает кэш, находится ниже этого блока
if (ttl != null) {
main.clock.schedule(main, ttl.toMillis(), TimeUnit.MILLISECONDS);
}
else {
//error during TTL generation, signal != updatedSignal, aka dropped
if (signal.isOnNext()) {
Operators.onNextDropped(signal.get(), currentContext());
}
else if (signal.isOnError()) {
Operators.onErrorDropped(signal.getThrowable(), currentContext());
}
//immediate cache clear
main.run();
}
В этом случае ttl == null
, потому что поколение ttl
бросило Exception
. signal
- это error
, так что ветвь вводится и Operators.onErrorDropped
называется
public static void onErrorDropped(Throwable e, Context context) {
Consumer<? super Throwable> hook = context.getOrDefault(Hooks.KEY_ON_ERROR_DROPPED,null);
if (hook == null) {
hook = Hooks.onErrorDroppedHook;
}
if (hook == null) {
log.error("Operator called default onErrorDropped", e);
throw Exceptions.bubble(e);
}
hook.accept(e);
}
Итак, здесь мы можем видеть, что если в контексте нет хука onError
и нет набора по умолчанию, то вызывается throw Exceptions.bubble(e)
и код в MonoCacheTime
возвращается рано, не вызывая main.run()
. Следовательно, ошибка остается кэшированной неопределенно , поскольку TTL не существует!
Следующий код решает эту проблему
public class TestBench {
private static final Logger logger = LoggerFactory.getLogger(TestBench.class);
private static final Consumer<Throwable> onErrorDropped = e -> logger.error("Dropped", e);
static {
//add default hook
Hooks.onErrorDropped(onErrorDropped);
}
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> {throw Exceptions.propagate(e);},
//meh
() -> {throw new RuntimeException();});
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
static final class SampleService {
volatile boolean serverAvailable = false;
String call() {
System.out.println("Calling service with availability: " + serverAvailable);
if (!serverAvailable) throw new RuntimeException("Error");
return "Success";
}
}
}
Но добавляет глобальный хук, который не идеален. Код намекает на возможность добавления хуков для каждого конвейера, но я не могу понять, как это сделать. Следующие работы, но, очевидно, взломать
.subscriberContext(ctx -> ctx.put("reactor.onErrorDropped.local", onErrorDropped))
Вопросы
- Является ли приведенная выше ошибка ошибкой, если отсутствие хука
onErrorDropped
приводит к бесконечному кешированию ошибок?
- Есть ли способ установить хук
onErrorDropped
в subscriberContext
, а не глобально?
Последующие действия
из кода; кажется, что возврат null
из функции генератора TTL поддерживается и имеет то же поведение, когда сигнал немедленно очищается. В случае, если это не так, абонент видит исходную ошибку, а не ошибку генератора TTL, и скрытую ошибку, которая, возможно, выглядит точнее
public static void main(String[] args) throws Exception {
var sampleService = new SampleService();
var producer = Mono.fromSupplier(sampleService::call).cache(
__ -> Duration.ofHours(24),
//don't cache errors
e -> null,
//meh
() -> null);
try {
producer.block();
} catch (RuntimeException e) {
System.out.println("Caught exception : " + e);
}
sampleService.serverAvailable = true;
var result = producer.block();
System.out.println(result);
}
Поддерживается ли это поведение? Должно ли это быть документировано?