Опрос байтовых / больших сообщений от ActiveMQ Artemis с использованием Apache Camel ConsumerTemplate - PullRequest
0 голосов
/ 08 января 2020

У меня проблема с приложением на основе Apache Camel при подключении к ActiveMQ Artemis через JMS. В конце одного из маршрутов Camel сообщения хранятся в очереди Artemis JMS. Устаревший компонент, работающий в том же приложении, периодически выбирает их оттуда с помощью ConsumerTemplate.

. Это прекрасно работает для сообщений Camel с простыми текстовыми телами, но вызывает ошибки при использовании тел байтовых массивов: кажется, что Артемида обрабатывает любое сообщение с байтовым телом в виде «большого сообщения», которое передается вместо того, чтобы храниться в памяти . Прием через ConsumerTemplate работает, но как только к телу или заголовкам обращаются, возникает следующее исключение:

org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session. Message: ActiveMQMessage[ID:90c4d1d5-3233-11ea-b0cc-44032c68a56f]:PERSISTENT/ClientLargeMessageImpl[messageID=2974, durable=true, address=mytest,userID=90c4d1d5-3233-11ea-b0cc-44032c68a56f,properties=TypedProperties[firedTime=Wed Jan 08 17:26:03 CET 2020,__AMQ_CID=90b4f34e-3233-11ea-b0cc-44032c68a56f,breadcrumbId=ID-NB045-evolit-co-at-1578500762151-0-1,_AMQ_ROUTING_TYPE=1,_AMQ_LARGE_SIZE=3]]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:172) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:221) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:54) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.example.cdi.JmsPoller.someMethod(JmsPoller.java:36) ~[classes/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
        at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:188) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79) [camel-core-2.22.1.jar:2.22.1]
        at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_171]
        at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_171]
Caused by: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more
Caused by: org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more

Проблема также возникает для сообщений, которые не превышают minLargeMessageSize Артемида, в тестовой программе даже на 3 байта.

По совпадению, та же проблема возникла в автономном приложении, используемом для тестирования приложения. Там я смог решить проблему, оставив сеанс и приемник JMS открытыми, пока тело сообщения и заголовки JMS не будут полностью прочитаны. Что касается Camel, в Spring JmsTemplate, на котором основан Camel, это отвлечено.


Я сверился с пользовательской документацией компонента Camel JMS , чтобы найти параметры конфигурации, которые могут помочь меня. Я пробовал следующее:

  • eagerLoadingOfProperties=true на стороне потребителя: не влияет, кажется, влияет только на MessageListenerContainer. Документация гласит:

Он использует [...] Spring JmsTemplate для отправки и MessageListenerContainer для потребления.

Однако при отладке казалось, что MessageListenerContainer используется только при использовании сообщений от конечной точки JMS на маршруте Camel. Использование ConsumerTemplate, как в моем случае, использует JmsTemplate для потребления.

  • messageConverter и mapJmsMessage на стороне потребителя: никакого эффекта, они выполняются, когда сеанс уже закрыт
  • alwaysCopyMessage на стороне производителя: я подумал, что, возможно, копирование предотвращает использование больших потоковых сообщений, без эффекта
  • streamMessageTypeEnabled=false на стороне производителя: без эффекта
  • jmsMessageType=Bytes как на стороне производителя, так и на стороне потребителя: не влияет
  • transferExchange=true как на сторону производителя, так и на потребителя: похоже, это решает мой конкретный случай c, но это похоже на обходной путь. Документация рекомендует использовать эту опцию с осторожностью.

Так что сейчас, transferExchange, кажется, моя лучшая ставка, если предположить, что она действительно решает мою проблему во всех тестовых случаях. Тем не менее, я был бы рад получить более полное представление о проблеме или других решениях:

  1. Почему Artemis в любом случае рассматривает сообщения с маленьким байтовым массивом как большие сообщения?
  2. Поддерживает ли Camel ConsumerTemplate большие потоковые сообщения вообще?

Мои версии - Camel 2.22.1 и Artemis 2.10.1.


Мне удалось воспроизвести мою проблему, изменив Camel Пример camel-example-cdi из пакета выпуска Camel, чтобы иметь минимальные классы, показанные ниже. Кроме того, я добавил camel-jms и зависимости Артемиды и запустил Артемис локально, как описано в примере camel-example-artemis-large-messages.

public class MyRoutes extends RouteBuilder {

    @Override
    public void configure() {
        setupJmsComponent();

        from("timer:writeTimer?period=6000")
                .log("writing to JMS")
                .setBody(() -> new byte[]{0,1,2})
                .to(JmsPoller.ENDPOINT);

        from("timer:pollTimer?period=3000")
            .to("bean:jmsPoller");
    }

    private void setupJmsComponent() {
        ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConnectionFactory(connectionFactory);
        getContext().addComponent("jms", jmsComponent);
    }

}
@Singleton
@Named("jmsPoller")
public class JmsPoller {
    static final String ENDPOINT = "jms:queue:mytest";

    @Inject
    private ConsumerTemplate consumerTemplate;

    public void someMethod(String body) {
        Exchange exchange = consumerTemplate.receive(ENDPOINT, 1000L);
        System.out.println("Received " + (exchange == null ? null : exchange.getIn().getBody()));
    }

}

1 Ответ

0 голосов
/ 08 января 2020

ActiveMQ Артемида не обрабатывает просто любое сообщение с байтовым телом как "большое" сообщение. Стоит отметить, что брокер в конечном итоге обрабатывает все тела сообщений как массив байтов, потому что это именно то, что они есть. Однако, чтобы считаться «большим», сообщение должно превышать определенный размер. Документация гласит:

Любое сообщение, превышающее определенный размер, считается большим сообщением. Большие сообщения будут разбиты и отправлены фрагментами. Это определяется параметром URL minLargeMessageSize.

Примечание :

Apache Сообщения ActiveMQ Artemis кодируются с использованием 2 байтов на символ, поэтому, если данные сообщения заполняются символами ASCII (которые составляют 1 байт), размер полученного сообщения Apache ActiveMQ Artemis примерно удвоится. Это важно при расчете размера «большого» сообщения, так как оно может быть меньше, чем minLargeMessageSize перед отправкой, но затем оно превращается в «большое» сообщение после его кодирования.

Значение по умолчанию - 100 КБ.

Похоже, что вариант использования приложения просто не соответствует семантике поддержки больших сообщений в ActiveMQ Artemis с момента сеанса, из которого пришло сообщение. закрывается до того, как тело сообщения будет полностью получено.

Поэтому я рекомендую либо оставить сеанс открытым, пока тело не будет прочитано, либо увеличить minLargeMessageSize на URL-адресе приложения, равном отправив сообщение, чтобы никакие сообщения никогда не считались "большими". Последний вариант может привести к большему использованию памяти в брокере, поскольку все тело сообщения будет одновременно храниться в памяти.

...