Springboot RabbitMQ с одновременными потребителями - PullRequest
3 голосов
/ 04 марта 2020

У меня простая архитектура, в которой один из трех процессов (на самом деле dockerized контейнер с пружинной загрузкой) создает тысячи сообщений для обмена / очереди, и ВСЕ три процесса потребляют эти сообщения.

Я хочу потребитель должен быть многопоточным для достижения максимальной пропускной способности, но с моей текущей конфигурацией я не достигаю того, чего хочу.

Вот настройка:

Слушатель:

  @RabbitListener(
    bindings = @QueueBinding(
      value = @Queue(
                value = RabbitMQConfiguration.BORD_QUEUE, 
                durable = "true", 
                arguments = @Argument(name = "x-queue-mode", value = "lazy")),
      exchange = @Exchange(value = RabbitMQConfiguration.QUEUE_EXCHANGE),
      key = "B"), 
    concurrency = "8-8")

  public void listenBordero(Long bId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Received message: {{}}", bId);
    processor.process(bId);
  }

Вот, возможно, уместная конфигурация springboot application.properties:

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.max-concurrency=8
spring.rabbitmq.listener.simple.concurrency=8

Хотя я считаю, что не все из них необходимы из-за конфигурации @RabbitListener на основе аннотаций.

В менеджменте rabbitmq я вижу enter image description here с состоянием, в основном, бездействующим.

Из моих журналов прыгучей загрузки я вижу, что потоки едва ли происходят (имя потока только в журнале изменяется после нескольких десятков строк, тогда как я ожидаю, что переключаться будет гораздо чаще).

Очередь отображается в консоли управления:

enter image description here

Я ожидал бы увидеть статистику потребителя на странице канала, чтобы быть намного более «работающей».

Может кто-нибудь, пожалуйста, просветите меня?

Заранее спасибо!

РЕДАКТИРОВАТЬ: После активации отладки для spring-amqp я вижу, что несколько сообщений получаются, но потом потоки с именем «..Container # 0- n » обрабатывается только в блоке blo c. Я ожидал бы, что выходные данные нескольких запущенных потоков-контейнеров будут перепутаны. Может ли это быть проблемой ThreadPool?

Вот выдержка из журнала (с уменьшенным шумом):

2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-7] d.i.clearing.rabbit.RabbitMQListener     : Received message: {436}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-5] d.i.clearing.rabbit.RabbitMQListener     : Received message: {450}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-12] d.i.clearing.rabbit.RabbitMQListener     : Received message: {414}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-11] d.i.clearing.rabbit.RabbitMQListener     : Received message: {418}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-8] d.i.clearing.rabbit.RabbitMQListener     : Received message: {432}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-2] d.i.clearing.rabbit.RabbitMQListener     : Received message: {456}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-15] d.i.clearing.rabbit.RabbitMQListener     : Received message: {469}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-16] d.i.clearing.rabbit.RabbitMQListener     : Received message: {394}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-13] d.i.clearing.rabbit.RabbitMQListener     : Received message: {409}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-4] d.i.clearing.rabbit.RabbitMQListener     : Received message: {452}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-9] d.i.clearing.rabbit.RabbitMQListener     : Received message: {431}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-10] d.i.clearing.rabbit.RabbitMQListener     : Received message: {420}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-14] d.i.clearing.rabbit.RabbitMQListener     : Received message: {350}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-1] d.i.clearing.rabbit.RabbitMQListener     : Received message: {400}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-3] d.i.clearing.rabbit.RabbitMQListener     : Received message: {453}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-6] d.i.clearing.rabbit.RabbitMQListener     : Received message: {449}
... Logging Noise removed
2020-03-05 08:15:26.409  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : Calculating {7183042} - {8330}/{5500}  
2020-03-05 08:15:26.411  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.412 DEBUG 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.413  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.507  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.669  WARN 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:26.669  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:33.012 DEBUG 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.014 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed

1 Ответ

1 голос
/ 19 марта 2020

prefetch = 1 означает, что максимальное количество неподтвержденных сообщений в текущей очереди не более 1. Когда это сообщение не подтверждено, следующее сообщение не может войти в очередь потребления.

Или вы можете рассмотреть возможность увеличения количества потребители ускорить обработку сообщений https://www.rabbitmq.com/tutorials/tutorial-two-java.html
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...