MessageListener.onMessage постоянно вызывается в RabbitMQ с Spring Boot - PullRequest
0 голосов
/ 24 января 2020

У меня есть MessageListener.onMessage с потоком сна. Я имитирую фактическое время обработки, которое метод onMessage займет в вышеупомянутом спящем потоке. Однако я заметил, что он вызывается несколько раз подряд для оставшихся сообщений, пока они не будут обработаны методом onMessage. Я вижу это как неэффективность.

Фактический счетчик сообщений в очереди: 1000

Вывод рабочего числа для попаданий

onMessage<<15656
onMessage<<15657
onMessage<<15658
onMessage<<15659
onMessage<<15660
onMessage<<15661
onMessage<<15662
onMessage<<15663

Блок кода

@Service
class ThreadPooledMessageListener implements MessageListener {
@Autowired
TaskExecutor threadPoolTaskExecutor;

AtomicInteger processedCount = new AtomicInteger();

@Override
public void onMessage(Message message) {
    System.out.println("onMessage<<" + processedCount.incrementAndGet());
    threadPoolTaskExecutor.execute(new MessageProcessor(message));

}
}

class MessageProcessor implements Runnable {
Message processingMessage;

public MessageProcessor(Message message) {
    this.processingMessage = message;
}

@Override
public void run() {
    System.out.println("================================"+ Thread.currentThread().getName());
    System.out.println(processingMessage);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("================================");
}
}

Каковы возможные исправления для этого.


Как указывал @Gary Russell; Проблема заключалась в том, что в моем коде я использовал неуправляемый контейнер SimpleMessageListenerContainer. Исправлено это с помощью управляемого bean-компонента Spring и определенного там параллелизма. Работает как положено. Фиксированный сегмент кода

    @Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(queue);
    container.setMessageListener(threadPooledMessageListener);
    container.setConcurrentConsumers(4);
    container.start();
    return container;
}

1 Ответ

1 голос
/ 24 января 2020

>I see this as an inefficiency.

Непонятно, что вы имеете в виду. Поскольку вы передаете обработку сообщения другому потоку, слушатель немедленно завершает работу и, конечно, доставляется следующее сообщение.

Это может привести к потере сообщения в случае сбоя.

Если вы пытаетесь достичь параллелизма; Лучше установить свойство контейнера concurrentConsumers, а не выполнять собственное управление потоками в слушателе. Контейнер будет управлять потребителями для вас.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...