Обрабатывайте большие сообщения с помощью Apache Camel и AMQ Artemis - PullRequest
0 голосов
/ 05 июня 2019

Когда я получаю большое сообщение (100 КБ +) в очереди AMQ Artemis и пытаюсь направить это сообщение на другой AMQ, и это сообщение имеет свойство _AMQ_LARGE_SIZE, я получил следующую ошибку:

14:38:56.250 [Camel (CamelTestRoute) thread #1 - JmsConsumer[QUEUE.TEST]] WARN  o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST]
org.apache.camel.RuntimeCamelException: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST

Я знаю, если я установлю свойство minLargeMessageSize в фабрике соединений, которая отправляет сообщение в AMQ, эта проблема не возникает.

Проблема в том, что я не могу контролировать коды, которые создают соединениеФабрики, а иногда они не устанавливают свойство «Большой размер сообщения».

Есть ли способ, как я могу справиться с этим в Camel с моей фабрикой соединений?

* EDIT

16:33:03.836 [Camel (CamelTestRoute) thread #1 - JmsConsumer[QUEUE.TEST]] WARN  o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST]
org.apache.camel.RuntimeCamelException: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
        at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:196)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:117)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: javax.jms.JMSRuntimeException: Invalid address QUEUE.TEST
        at org.apache.activemq.artemis.jms.client.ActiveMQDestination.fromAddress(ActiveMQDestination.java:119)
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.getJMSDestination(ActiveMQMessage.java:386)
        at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:187)
        at org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:229)
        at org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:257)
        at org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:214)
        at org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:164)
        at org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:93)
        at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:115)
        at org.apache.camel.impl.MDCUnitOfWork.<init>(MDCUnitOfWork.java:54)
        at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:32)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663)
        at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149)
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)
        ... 11 common frames omitted

1 Ответ

1 голос
/ 06 июня 2019

Если вы используете клиент Artemis 1.x против брокера Artemis 2.x, вам необходимо настроить акцептор, к которому подключается клиент, с соответствующими anycastPrefix и multicastPrefix, например:

<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
...