Здесь сначала среда:
- Служба, публикующая большое количество сообщений в очереди. Это загрузочное приложение Spring, работающее в контейнере, и у меня есть к нему доступ (значит, это можно изменить)
- Другая служба, использующая эту очередь, обрабатывает сообщения для создания отчета xml и отправляет этот отчет в конечную точку остальных API. Это также загрузочное приложение Spring, работающее в контейнере, и у меня есть к нему доступ.
- Конечная точка иногда реагирует очень медленно. У меня нет знаний о внутренней системе конечной точки, и у меня нет доступа к ней.
Мой потребитель был чем-то вроде этого:
@Component
public class MyConsumer {
private MyReportCreator reportCreator;
private MyReportSender reportSender;
@Autowired
public MyConsumer(MyReportCreator reportCreator, MyReportSender reportSender) {
this.reportCreator = reportCreator;
this.reportSender = reportSender;
}
public void consume(MyMessage message) {
try {
String report = reportCreator.create(message);
reportSender.send(report);
} catch(Exception e) {
logger.error("Error occured while reporting", e);
}
}
}
Проблема в том, потребитель потребляет очередь один за другим, но издатель отправляет тысячи сообщений каждую секунду, поэтому очередь складывается очень быстро. Чтобы решить эту проблему, я добавил аннотацию @EnableAsync
к приложению и аннотацию @Aysnc
к методу MyReportSender.send
и настроил TaskExecuter следующим образом:
@Component
public class MyReportSender {
private RestTemplate restTemplate;
@Autowired
public MyReportSender(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
@Async
public void send(String report) {
\\ sends the report using restTemplate.exchange, ignores result
}
}
@SpringBootApplication
@EnableAsync
public class MyApp {
public static void main(String[] args) {
SpringApplication.run(MyApp.class, args);
}
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(200);
executor.initialize();
return executor;
}
}
Как только я выпустил это изменение, очередь сразу растаяла, но через некоторое время я получил это исключение:
Platform exception message: Executor [java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$584/29229306@7dd254ed
Platform exception trace: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$584/29229306@7dd254ed
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:344)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:290)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.personal.MyReportSender$$EnhancerBySpringCGLIB$$6f1f0df4.send(<generated>)
at com.personal.MyReportSender.sendReport(MyReportSender.java:95)
at sun.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:246)
at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:494)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.personal.MyReportSender$$EnhancerBySpringCGLIB$$604ed475.send(<generated>)
at com.personal.MyConsumer.consume(MyConsumer.java:50)
at com.personal.CommonMessageProcessor.process(CommonMessageProcessor.java:44)
at com.personal.CommonMessageReceiver.receiveMessage(CommonMessageReceiver.java:52)
at com.personal.CommonMessageReceiver.receiveMessage(CommonMessageReceiver.java:43)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:280)
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:363)
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:292)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1547)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1473)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1461)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1456)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1405)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3cc54c9a rejected from java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:341)
... 37 more
Таким образом, исполнитель задачи отклоняет сообщение, когда он обрабатывает 100 сообщений и 200 сообщений в своей внутренней очереди, и получает новое сообщение. Нехорошо.
То, что я хочу, это в основном использовать эту очередь как 100 сообщений за раз, это не обязательно должно быть это @Async
решение. Любые идеи, комментарии, помощь приветствуется ...