Как использовать Приоритетную Очередь с Весенней загрузкой AsyncConfigurer - PullRequest
0 голосов
/ 27 сентября 2018

У меня есть приложение, в котором у меня есть несколько потоков, читающих сообщения из места назначения jms.Поток слушателя читает сообщение, вносит в него некоторые изменения и вызывает несколько других методов различных классов.Эти методы снабжены примечанием @Async о том, что все методы выполняются параллельно с использованием пользовательского ThreadPoolTaskExecutor.

@Override
public Executor getAsyncExecutor() {        
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(corePoolSize);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setKeepAliveSeconds(keepAliveSeconds);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setTaskDecorator(new LoggingTaskDecorator());
    executor.initialize();
    return executor;
}

До сих пор все сообщения считались одинаковыми по приоритету, все было нормально, так каквсе сообщения поступали в LinkedBlockingQueue, если ни один из потоков Executor не оставался доступным.

Теперь возникает требование, когда определенному типу сообщения, считываемого из очереди, как ожидается, будет присвоен более высокий приоритет, чем любомудругое сообщение, прочитанное из очереди.

В настоящее время я использую «org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor», который не предоставляет какого-либо метода, в котором я мог бы установить Приоритетную очередь в качестве своей реализации очереди блокировки.

Не могли бы вы помочь мне решить этот сценарий?Или это существующий дизайн системы не может вместить это изменение?Или что может быть лучшим решением для таких сценариев?

Спасибо!

1 Ответ

0 голосов
/ 27 сентября 2018

Просто переопределив метод createQueue.Также вы должны использовать метод @Bean для создания экземпляра bean-компонента, таким образом Spring может правильно управлять жизненным циклом, небольшая, но важная вещь (иначе отключение не будет работать должным образом) ..

@Override
public Executor getAsyncExecutor() {
  return taskExecutor();
}        

@Bean    
public ThreadPoolTaskExecutor taskExecutor() {        
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
      return new PriorityBlockingQueue<>(queueCapacity);
    } 
  };
  executor.setCorePoolSize(corePoolSize);
  executor.setMaxPoolSize(maxPoolSize);
  executor.setQueueCapacity(queueCapacity);
  executor.setKeepAliveSeconds(keepAliveSeconds);
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  executor.setTaskDecorator(new LoggingTaskDecorator());  
  return executor;
}

Нечто подобное должно работать.Метод createQueue теперь создает PriorityBlockingQueue вместо значения по умолчанию LinkedBlockingQueue.

...