У меня есть MessageListener.onMessage с потоком сна. Я имитирую фактическое время обработки, которое метод onMessage займет в вышеупомянутом спящем потоке. Однако я заметил, что он вызывается несколько раз подряд для оставшихся сообщений, пока они не будут обработаны методом onMessage. Я вижу это как неэффективность.
Фактический счетчик сообщений в очереди: 1000
Вывод рабочего числа для попаданий
onMessage<<15656
onMessage<<15657
onMessage<<15658
onMessage<<15659
onMessage<<15660
onMessage<<15661
onMessage<<15662
onMessage<<15663
Блок кода
@Service
class ThreadPooledMessageListener implements MessageListener {
@Autowired
TaskExecutor threadPoolTaskExecutor;
AtomicInteger processedCount = new AtomicInteger();
@Override
public void onMessage(Message message) {
System.out.println("onMessage<<" + processedCount.incrementAndGet());
threadPoolTaskExecutor.execute(new MessageProcessor(message));
}
}
class MessageProcessor implements Runnable {
Message processingMessage;
public MessageProcessor(Message message) {
this.processingMessage = message;
}
@Override
public void run() {
System.out.println("================================"+ Thread.currentThread().getName());
System.out.println(processingMessage);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("================================");
}
}
Каковы возможные исправления для этого.
Как указывал @Gary Russell; Проблема заключалась в том, что в моем коде я использовал неуправляемый контейнер SimpleMessageListenerContainer. Исправлено это с помощью управляемого bean-компонента Spring и определенного там параллелизма. Работает как положено. Фиксированный сегмент кода
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(threadPooledMessageListener);
container.setConcurrentConsumers(4);
container.start();
return container;
}