Асинхронная аннотация и параллельная обработка - PullRequest
0 голосов
/ 18 июня 2019

Здесь сначала среда:

  • Служба, публикующая большое количество сообщений в очереди. Это загрузочное приложение Spring, работающее в контейнере, и у меня есть к нему доступ (значит, это можно изменить)
  • Другая служба, использующая эту очередь, обрабатывает сообщения для создания отчета xml и отправляет этот отчет в конечную точку остальных API. Это также загрузочное приложение Spring, работающее в контейнере, и у меня есть к нему доступ.
  • Конечная точка иногда реагирует очень медленно. У меня нет знаний о внутренней системе конечной точки, и у меня нет доступа к ней.

Мой потребитель был чем-то вроде этого:

@Component
public class MyConsumer {

   private MyReportCreator reportCreator;
   private MyReportSender reportSender;

   @Autowired
   public MyConsumer(MyReportCreator reportCreator, MyReportSender reportSender) {
      this.reportCreator = reportCreator;
      this.reportSender = reportSender;
   }

   public void consume(MyMessage message) {
      try {
         String report = reportCreator.create(message);
         reportSender.send(report);
      } catch(Exception e) {
         logger.error("Error occured while reporting", e);
      }
   } 
}

Проблема в том, потребитель потребляет очередь один за другим, но издатель отправляет тысячи сообщений каждую секунду, поэтому очередь складывается очень быстро. Чтобы решить эту проблему, я добавил аннотацию @EnableAsync к приложению и аннотацию @Aysnc к методу MyReportSender.send и настроил TaskExecuter следующим образом:

@Component
public class MyReportSender {

   private RestTemplate restTemplate;

   @Autowired
   public MyReportSender(RestTemplate restTemplate) {
      this.restTemplate = restTemplate;
   }

   @Async
   public void send(String report) {
      \\ sends the report using restTemplate.exchange, ignores result
   }

}
@SpringBootApplication
@EnableAsync
public class MyApp {

    public static void main(String[] args) {
        SpringApplication.run(MyApp.class, args);
    }

    @Bean   
    public Executor taskExecutor() {    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
        executor.setCorePoolSize(100);  
        executor.setMaxPoolSize(100);   
        executor.setQueueCapacity(200); 
        executor.initialize();  
        return executor;    
    }
}

Как только я выпустил это изменение, очередь сразу растаяла, но через некоторое время я получил это исключение:

Platform exception message: Executor [java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$584/29229306@7dd254ed

Platform exception trace: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$584/29229306@7dd254ed
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:344)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:290)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.personal.MyReportSender$$EnhancerBySpringCGLIB$$6f1f0df4.send(<generated>)
    at com.personal.MyReportSender.sendReport(MyReportSender.java:95)
    at sun.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:246)
    at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:494)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.personal.MyReportSender$$EnhancerBySpringCGLIB$$604ed475.send(<generated>)
    at com.personal.MyConsumer.consume(MyConsumer.java:50)
    at com.personal.CommonMessageProcessor.process(CommonMessageProcessor.java:44)
    at com.personal.CommonMessageReceiver.receiveMessage(CommonMessageReceiver.java:52)
    at com.personal.CommonMessageReceiver.receiveMessage(CommonMessageReceiver.java:43)
    at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:280)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:363)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:292)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1547)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1473)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1461)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1456)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1405)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3cc54c9a rejected from java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:341)
    ... 37 more

Таким образом, исполнитель задачи отклоняет сообщение, когда он обрабатывает 100 сообщений и 200 сообщений в своей внутренней очереди, и получает новое сообщение. Нехорошо.

То, что я хочу, это в основном использовать эту очередь как 100 сообщений за раз, это не обязательно должно быть это @Async решение. Любые идеи, комментарии, помощь приветствуется ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...