У меня есть следующее приложение (это же приложение с Gradle + Spring Boot находится здесь https://www.dropbox.com/s/vizr5joyhixmdca/demo.zip?dl=0):
Writer.java
содержит некоторый код, который запускается асинхронно с помощью @Async
аннотация. Один метод возвращает void
, а другой возвращает Future
. Оба варианта разрешены согласно документации.
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.Future;
@Component
@Async("customExecutor")
public class Writer {
public void write() {
System.out.println("Writing something");
throw new RuntimeException("Writer exception");
}
public Future<Void> writeFuture() {
System.out.println("Writing something with future");
throw new RuntimeException("Writer exception with future");
}
}
ErrorHandlingThreadPoolExecutor.java
- пользовательский исполнитель. Единственное отличие от ThreadPoolExecutor
- это обработка ошибок. Реализация afterExecute
точно такая же, как предложенная в методе javado c. Таким образом, идея здесь состоит в том, чтобы печатать "[ERROR] " + ex
, когда происходит исключение.
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component("customExecutor")
public class ErrorHandlingThreadPoolExecutor extends ThreadPoolExecutor {
public ErrorHandlingThreadPoolExecutor() {
super(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
handleError(t);
}
}
private void handleError(Throwable ex) {
System.out.println("[ERROR] " + ex);
}
}
Config.java
включает асинхронную c обработку + планирование. Он также вызывает writer.write
по расписанию.
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableScheduling
@EnableAsync
public class Config {
private final Writer writer;
public Config(Writer writer) {
this.writer = writer;
}
@Scheduled(fixedRate = 1000)
public void writeBySchedule() {
writer.write();
// writer.writeFuture();
}
}
Когда я запускаю это приложение, я вижу следующий результат:
Writing something
2020-07-14 21:16:33.791 ERROR 19860 --- [pool-1-thread-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.example.demo.Writer.write()
java.lang.RuntimeException: Writer exception
at com.example.demo.Writer.write(Writer.java:14) ~[main/:na]
at com.example.demo.Writer$$FastClassBySpringCGLIB$$cd00988d.invoke(<generated>) ~[main/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_242]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
...
В то же время, если я прокомментирую writer.write()
и раскомментируйте writer.writeFuture()
, я получаю следующее:
Writing something with future
[ERROR] java.lang.RuntimeException: Writer exception with future
...
Последнее - то, чего я пытаюсь достичь с помощью ErrorHandlingThreadPoolExecutor
. Однако я бы хотел, чтобы мои методы возвращали void
. Я обнаружил, что причина, по которой мое исключение не достигает пользовательского метода ErrorHandlingThreadPoolExecutor.handleError()
, находится здесь: https://github.com/spring-projects/spring-framework/blob/master/spring-aop/src/main/java/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java#L308. Этот метод выполняется до моего пользовательского, и, похоже, нет возможности повторно создать исключение для методов void
. Я знаю о классе AsyncConfigurerSupport
, который позволяет настраивать обработку исключений, но исключение по-прежнему не ускользнет из AsyncExecutionAspectSupport.handleError()
.
Подводя итог, есть ли способ распространения моих исключений из асинхронно выполняемые методы до ErrorHandlingThreadPoolExecutor.handleError()
, если они объявляют void
как возвращаемый тип? На данный момент кажется, что я мог бы использовать исполнителей напрямую без @Async
, но возможно ли это с @Async
? Если нет, то какое исправление может быть менее «инвазивным» (меньше кода на изменение и поддержку)? У меня довольно много асинхронных c методов, возвращающих void
.
ОБНОВЛЕНИЕ : Основываясь на принятом ответе, я придумал следующий аспект:
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.Map;
@Component
@Aspect
public class ErrorHandlingAspect implements ApplicationListener<ContextRefreshedEvent> {
public static final String DEFAULT_EXECUTOR_BEAN_NAME = "defaultExecutor";
private Map<String, ErrorHandlingThreadPoolExecutor> errorHandlingExecutors;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// initializing here because not all beans come if initialized in constructor
this.errorHandlingExecutors = event.getApplicationContext()
.getBeansOfType(ErrorHandlingThreadPoolExecutor.class);
}
@Pointcut(
// where @Async is on class level
"@within(org.springframework.scheduling.annotation.Async)"
// where @Async is on method level
+ " || @annotation(org.springframework.scheduling.annotation.Async)")
public void asyncMethods() {
}
@Around("asyncMethods()")
public Object runWithErrorHandling(ProceedingJoinPoint joinPoint) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
Async annotation = method.getAnnotation(Async.class);
if (annotation == null) {
annotation = method.getDeclaringClass().getAnnotation(Async.class);
}
if (annotation == null) {
// shouldn't happen because of pointcut configuration, just for safety
return joinPoint.proceed();
}
String asyncExecutorName = annotation.value();
if (StringUtils.isEmpty(asyncExecutorName)) {
asyncExecutorName = DEFAULT_EXECUTOR_BEAN_NAME;
}
ErrorHandlingThreadPoolExecutor asyncExecutor = errorHandlingExecutors.get(asyncExecutorName);
if (asyncExecutor == null) {
// can happen if the declared executor isn't extending ErrorHandlingThreadPoolExecutor
// or if @Async uses the default executor which is either not registered as a bean at all
// or not named DEFAULT_EXECUTOR_BEAN_NAME
return joinPoint.proceed();
}
try {
return joinPoint.proceed();
} catch (Throwable throwable) {
asyncExecutor.handleError(throwable);
return null;
}
}
}
Плюсы:
- Позволяет обрабатывать ошибки в асинхронно выполняемом коде без работы с потоками.
- Может иметь различную обработку ошибок в зависимости от исполнителя.
- Может методы wrap, возвращающие как
void
, так и Future<>
.
Минусы:
- Невозможно обрабатывать ошибки в вызывающем потоке (только в вызываемом).
- Требуется зарегистрировать исполнитель по умолчанию как bean-компонент и дать ему конкретное c имя.
- Работает только с аннотациями
@Async
, а не с кодом asyn c, переданным непосредственно исполнителю с помощью submit()
.