У меня вопрос о странном исключении.
Я получил несколько исключений RejectedExecutionException при производстве (около 200 в день).Reactor запускается в пользовательском планировщике с использованием Schedulers.fromExecutorService ().
Итак, сначала я проверил размер очереди или что-то из ExcutorService
, но это все нормально.нет полной очерединет выключения
Это мой код, вызвавший исключение.
return reactionRepository
.getPage(context, scanQuery)
.buffer(100)
.concatMap(Flux::fromIterable)
.flatMapSequential(likeSn -> findOne(context, parentId, likeSn)
.transform(ReactiveHelpers.defaultIfNotFoundOrError(Optional.empty())))
.filter(Optional::isPresent)
.map(Optional::get)
.doOnError(e -> log.error("Failed to find likes", e));
getPage () возвращает объект Flux.Следующие коды являются основным кодом для чтения информации из кластеров Redis.
...
return bucketList.publishOn(redisScheduler)
.filter(val -> val.getScore() >= 0)
.map(Value::getValue)
И это журнал исключений, который я получил в своем файле журнала.этот журнал ошибок был записан в строке выше .doOnError(e -> log.error("Failed to find likes", e));
.
reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
at reactor.core.Exceptions.failWithRejected(Exceptions.java:249)
at reactor.core.publisher.Operators.onRejectedExecution(Operators.java:412)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:293)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.request(FluxPublishOn.java:261)
at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.request(FluxBuffer.java:111)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:227)
at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onSubscribe(FluxBuffer.java:125)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onSubscribe(FluxPublishOn.java:209)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:108)
at reactor.core.publisher.FluxBuffer.subscribe(FluxBuffer.java:72)
at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
at reactor.core.publisher.Flux.subscribe(Flux.java:6877)
at reactor.core.publisher.FluxMergeSequential.subscribe(FluxMergeSequential.java:99)
at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:52)
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83)
at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
at reactor.core.publisher.Flux.subscribe(Flux.java:6877)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:244)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:202)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onSubscribe(FluxGroupBy.java:165)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:140)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:64)
at reactor.core.publisher.FluxGroupBy.subscribe(FluxGroupBy.java:82)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
at reactor.core.publisher.MonoCollect.subscribe(MonoCollect.java:66)
at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)
at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at io.micrometer.core.instrument.AbstractTimer.recordCallable(AbstractTimer.java:143)
at io.micrometer.core.instrument.Timer.lambda$wrap$1(Timer.java:137)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Scheduler unavailable
at reactor.core.Exceptions.<clinit>(Exceptions.java:502)
at reactor.core.publisher.Operators.onOperatorError(Operators.java:345)
at reactor.core.publisher.Operators.onOperatorError(Operators.java:323)
at reactor.core.publisher.Operators.onOperatorError(Operators.java:305)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
at reactor.core.publisher.Operators.complete(Operators.java:128)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
Я не знаю, почему возникает это исключение.
В любом случае, при отладке этой проблемы я видел случай, когда исключение выдается только один раз.(Я не уверен, что это реальная причина.)
В ExecutorSchedulerWorker::schedule
, !tasks.add(r)
выражение оценивается как истинное, поэтому возникло исключение.
ExecutorTrackedRunnable r = new ExecutorTrackedRunnable(task, this, true);
if (!tasks.add(r)) {
throw Exceptions.failWithRejected();
}
ЭтоПодсказка, что у меня есть только сейчас.
Кто-нибудь знает эту проблему?Любые предложения могут помочь мне.
Редактировать 1 .Добавьте упомянутый код.Это вспомогательный код для обработки моего пользовательского исключения
public static <T> Function<Mono<T>, Publisher<T>> defaultIfNotFoundOrError(T defaultValue) {
return source -> source.onErrorResume(ReactionStorageException.class,
e -> {
if (e.getErrorCode() == ReactionStorageErrorCode.NOT_FOUND) {
return Mono.just(defaultValue);
} else {
return Mono.error(e);
}
});
}
И findOne ()
public Mono<Optional<Like>> findOne(final RequesterContext context,
final String parentId,
final int sn,
final boolean handleFaulted) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(parentId),
"parentId must not be an empty value");
Preconditions.checkArgument(sn >= StorageConstants.BASE_SN,
"Serial number must be greater than BASE_SN value");
final String likeInfoKey = RedisKeys.reactionInfo(reactionType, parentId, sn);
return cmds.hgetall(likeInfoKey)
.publishOn(redisScheduler)
.flatMap(m -> Mono.justOrEmpty(LikeRedisMapper.from(m)))
.switchIfEmpty(ReactiveHelpers.mapOrEmpty(handleFaulted,
requestFaultedLike(context, parentId, sn)))
.switchIfEmpty(ExceptionUtils.generate(ReactionStorageErrorCode.NOT_FOUND,
"Like(%s, %d) cannot be found",
parentId, sn))
.map(Optional::ofNullable)
.doOnError(e -> log.trace("Failed to find a like", e));
}
Обновление 2 .После глубокой отладки возникла исключительная ситуация из-за отмененного источника.Эта отмена вызвана Mono.zip (A, B, C ...).Над источником находится B. Если A является пустым источником, B следует отменить.но иногда запрос B обрабатывается после получения сигнала отмены.