Apache Camel - специальные символы в теле сообщения - PullRequest
0 голосов
/ 23 марта 2020

Я использую Camel версии 3.1 и пытаюсь отправить сообщение с одного сервера ActiveMQ на другой, используя AMQP Component с загрузкой Spring. После отправки сообщения в консоли назначения ActiveMQ подробности сообщения имеют следующий контент:

Sp�ASr�)�x-opt-jms-destQ�x-opt-jms-msg-typeQSs�^
�/ID:53e1ce3a-4drf-4f8a-9ff9-845fe0d0006e:3:1:1-1@�queue://testcamelwithamqp@@@@@@�qd�Sw�1:"test message"

Мое фактическое сообщение - 1:"test message", но каким-то образом заголовки JMS помещаются в тело сообщения в виде специальных символов. Любая помощь для решения?

Ниже приведен пример кода загрузки весной

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableAutoConfiguration
@SpringBootApplication
public class CamelMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(CamelMQApplication.class, args);
    }
}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfigInternal {

    @Value("${INTERNAL_SERVICE_USERNAME}")
    private String userName;
    @Value("${INTERNAL_SERVICE_PASSWORD}")
    private String pass;
    @Value("${INTERNAL_REMOTE_URI}")
    private String remoteUri;

    @Autowired
    private CamelContext camelInternal;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }

    public String getRemoteUri() {
        return remoteUri;
    }

    public void setRemoteUri(String remoteUri) {
        this.remoteUri = remoteUri;
    }

    private JmsConnectionFactory internalConnectionFactory() throws Exception {
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
        jmsConnectionFactory.setRemoteURI(remoteUri);
        jmsConnectionFactory.setUsername(userName);
        jmsConnectionFactory.setPassword(pass);
        return jmsConnectionFactory;
    }

    @Bean
    public AMQPComponent internalAmqpConnection() throws Exception {
        AMQPComponent amqp = new AMQPComponent();
        amqp.setConnectionFactory(internalConnectionFactory());
        camelInternal.addComponent("amqpInternal", amqp);
        return amqp;
    }

}

import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class SampleAutowiredAmqpRouteTest extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("{{route1.from}}")
        .convertBodyTo(String.class, "UTF-8")
        .removeHeaders("*")
        .log("From ActiveMQ: ${body}")
        .to("{{route1.to}}")
        .convertBodyTo(String.class, "UTF-8")
        .removeHeaders("*");

    }

}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfigRouteTest {

    @Value("${ACTIVEMQ_SERVICE_USERNAME}")
    private String userName;
    @Value("${ACTIVEMQ_SERVICE_PASSWORD}")
    private String pass;
    @Value("${ACTIVEMQ_REMOTE_URI}")
    private String remoteUri;

    @Autowired private CamelContext camelRouteTest;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }

    public String getRemoteUri() {
        return remoteUri;
    }

    public void setRemoteUri(String remoteUri) {
        this.remoteUri = remoteUri;
    }

    private JmsConnectionFactory amqp1ConnectionFactory() throws Exception {
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
        jmsConnectionFactory.setRemoteURI(remoteUri);
        jmsConnectionFactory.setUsername(userName);
        jmsConnectionFactory.setPassword(pass);
        return jmsConnectionFactory;
    }

    @Bean
    public AMQPComponent amqp1Connection() throws Exception {
        AMQPComponent amqp = new AMQPComponent();
        amqp.setConnectionFactory(amqp1ConnectionFactory());
        camelRouteTest.addComponent("amqpRouteTest", amqp);
        return amqp;
    }
}
application.properties

server.port=8071
camel.springboot.name = CamelTest
camel.springboot.main-run-controller = true

INTERNAL_REMOTE_URI=amqp://actimqserver1:12345
INTERNAL_SERVICE_USERNAME=admin
INTERNAL_SERVICE_PASSWORD=admin

ACTIVEMQ_REMOTE_URI=amqp://actimqserver2:12345
ACTIVEMQ_SERVICE_USERNAME=admin
ACTIVEMQ_SERVICE_PASSWORD=admin

route1.from = amqpInternal:test4camelamqpSrcQ
route1.to = amqpRouteTest:test4camelamqpTgtQ


Ответы [ 2 ]

0 голосов
/ 30 марта 2020

Решение: нам нужно добавить transport.transformer="jms" в файл конфигурации ActiveMQ, как показано ниже, и перезапустить Active MQ

<transportConnector name="amqp" uri="amqp://localhost:5672?**transport.transformer=jms**"/>

Это позволит ActiveMQ правильно отобразить заголовки входящих сообщений JMS. Значение по умолчанию 'transport.transformer=native' оборачивает сообщение AMQP в JMS BytesMessage, так как оно не отображается должным образом в консоли или когда сообщение фактически используется клиентом.

0 голосов
/ 26 марта 2020

Вы используете ActiveMQ с протоколом AMQP . Вы также настроили брокера для этого протокола?

Например, транспортный соединитель:

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>

Если честно Я не знаю, что происходит, когда вы используете простой брокер ActiveMQ (протокол по умолчанию OpenWire) и отправляете AMQP сообщения на него .

В вашем случае Camel, похоже, может правильно генерировать и потреблять сообщения в / из очереди. Это неудивительно, поскольку и производитель, и потребитель настроены для AMQP . Брокер просто хранит сообщение.

С другой стороны, вы пишете, что Springs JmsListener портит сообщение о потреблении. Возможно, потому что не ожидает сообщения AMQP , и вы пытаетесь получить сообщение JMS (TextMessage, ByteMessage et c).

Вы можете просто попытаться получить Message<?>, который является Spring-абстракцией для нескольких транспортов .

То, что сообщение испорчено в консоли ActiveMQ, невелико сюрприз. Я предполагаю, что сообщение AMQP имеет определенный формат c, и консоль не может правильно его отобразить, поскольку оно отличается от сообщения JMS.

...