Потребитель JMS блокирует другие группы JMSX - PullRequest
0 голосов
/ 08 января 2019

Я пытаюсь выяснить, могу ли я в любом случае повлиять на порядок обработки сообщений потребителя, когда сообщение откатывается в очередь. У меня есть простой код ниже, который помогает мне воспроизвести проблему. Я просто помещаю сообщения в очередь с разными JMSXGroupIds в определенном порядке:

  1. "A1" (JMSXGroupId: 1)
  2. "B1" (JMSXGroupId: 2)
  3. "A2" (JMSXGroupId: 1)
  4. "C1" (JMSXGroupId: 3)
  5. "B2" (JMSXGroupId: 2)

Код выполняет откат А1 (первоначально он повторяет сообщение 3 раза) и с задержкой возвращается в очередь. Однако потребитель затем ждет, пока он не сможет снова поднять А1 (после ожидания отсроченного времени), то есть группы B1 и С1 заблокированы за А1 и никогда не обрабатываются.

В идеале я надеялся, что, когда A1 вернется в очередь и получит команду ждать, потребитель получит B1 и C1 ... что я в конечном итоге пытаюсь сделать, это остановить одну JMSXGroup, блокирующую другие на потребителя. Кроме того, вероятно, стоит добавить, что мне нужно сохранить порядок последовательности сообщений для A (A1, A2, A3 ...) и надеялся сделать это, оставив их в очереди, вместо того, чтобы создавать какое-то решение для управления исключениями.

onException(Exception.class)
            .log("Exception Caught !! ")
            .redeliveryDelay("1000")
            .maximumRedeliveries(3)
            .handled(false)
            .markRollbackOnly()
            .log("log:output");

    from("amq:queue:mailbox?concurrentConsumers=1")
            .to(logEndpoint)
            .process(exchange -> {
                if(exchange.getIn().getBody(String.class).contains("A")) {
                    throw new Exception("Found A");
                }
            });

Я использую микросервис Apache Camel на основе Java с транзакционными маршрутами. Ничего особенного, но я могу предоставить больше деталей / деталей конфигурации, если это необходимо.

Заранее спасибо

Ответы [ 3 ]

0 голосов
/ 08 января 2019

Вы используете повторную доставку Camel, которая будет только повторной доставкой, вызывая метод процессора (например, в случае сбоя), а не весь маршрут.

Возможно, вы захотите взглянуть на использование транзакций JMS и разрешить откат сообщения вплоть до JMS-брокера, а также настроить параметры доставки в брокере сообщений.

Если у вас есть копия книги «Верблюд в действии», я бы посоветовал вам прочитать главу о транзакциях и главу по обработке ошибок.

0 голосов
/ 09 января 2019

Ваша проблема из-за concurrentConsumers=1 на конечной точке потребителя. Если у вас есть только один потребитель, невозможно обрабатывать группы JMS параллельно .

С этим ограничением у вас фактически есть Исключительный потребитель и заголовок группы JMS не действуют , поскольку в любом случае есть только один потребитель для обработки сообщений.

Заголовок JMSXGroupId обеспечивает, чтобы все сообщения с одинаковым идентификатором группы обрабатывались одним и тем же потребителем . Таким образом, если у вас будет 3 потребителя, 3 группы в вашем примере могут обрабатываться параллельно, даже если сообщение A1 «блокирует» потребителя группы А. на некоторое время.

Однако, когда у вас на меньше потребителей, чем у групп , это, конечно, тот случай, когда одно сообщение может блокировать другие группы просто потому, что один потребитель обрабатывает несколько групп.

0 голосов
/ 08 января 2019

Строгое упорядочение в очереди неизбежно приведет к таким проблемам, поскольку очередь должна придерживаться своей фундаментальной семантики «первым пришел-первым вышел» (то есть FIFO). Даже если B1 был получен сразу после сбоя A1, вам придется подождать, чтобы использовать A2, пока A1 не будет израсходован, чтобы сохранить порядок, и поскольку очередь должна быть израсходована FIFO, которая блокирует потребление C1 и любых других сообщений. за этим.

...