Как использовать обмен заголовками для отправки и получения Java-объектов в виде сообщений из очередей в rabbitmq - PullRequest
0 голосов
/ 05 мая 2019

Я пытаюсь использовать обмен заголовками в rabbitmq для создания и потребления сообщений. Когда я пытаюсь создать сообщение, мне нужно преобразовать сообщение в byte [], но на стороне потребителя я не могу десериализовать его должным образом. Я использовал classMapper в моей зависимости от Джексона, но он все еще не может декодировать класс. Я не уверен, что я что-то упустил или я делаю это неправильно.

ПроизводительКонфигурационный класс

@Configuration
public class RabbitMQConfig 
{

    @Value("${exchangeName}")
    String exchange;

    @Value("${queueName}")
    String queueName;

    @Value("${queueName1}")
    String queueName1;

    @Bean
    public List<Object> topicBindings() 
    {

        Queue headerQueue = new Queue(queueName);
        Queue headerQueue1 = new Queue(queueName1);

        Map<String,Object> mapForQueue = new HashMap<>();
        mapForQueue.put("key", "value");
        mapForQueue.put("key2", "value2");

        Map<String,Object> mapForQueue1 = new HashMap<>();
        mapForQueue.put("key", "value");
        mapForQueue1.put("key1", "value1");

        HeadersExchange headerExchange = new HeadersExchange(exchange);

        return Arrays.asList
                (
                        headerQueue,headerQueue1,headerExchange,
                        BindingBuilder.bind(headerQueue).to(headerExchange).whereAny(mapForQueue),
                        BindingBuilder.bind(headerQueue1).to(headerExchange).whereAll(mapForQueue1)
                );
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() 
    {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        jsonConverter.setClassMapper(classMapper());
        return jsonConverter;
    }

    @Bean
    public DefaultClassMapper classMapper() 
    {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("String", String.class);
        idClassMapping.put("Employee", Employee.class);
        classMapper.setIdClassMapping(idClassMapping);
        return classMapper;
    }

}

SenderClass

@Service
public class RabbitMQSender 
{
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${exchangeName}")
    private String exchange;

    public void send(Employee employee) 
    { 
        Message message = MessageBuilder.withBody(SerializationUtils.serialize(employee))
                .setHeader("key", "value")
                .setHeader("key1", "value1")
                .build();

        amqpTemplate.convertAndSend(exchange,"",message);
    }

}

ConsumerConfiguration Class

@EnableRabbit
@Configuration
public class ConsumerConfiguration
{
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() 
    {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        jsonConverter.setClassMapper(classMapper());
        return jsonConverter;
    }

    @Bean
    public DefaultClassMapper classMapper() 
    {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("String", String.class);
        idClassMapping.put("Employee", Employee.class);
        classMapper.setIdClassMapping(idClassMapping);
        return classMapper;
    }

}

ConsumerApplication

@Component
@RabbitListener(queues = "#{'${combinedQueue}'.split(';')}")
public class RabbitMqListener 
{
    @RabbitHandler
    public void recievedMessage(@Header(AmqpHeaders.CONSUMER_QUEUE) String rk,@Payload Employee employee) 
    {
        System.out.println("*********************************************************************************************");
        System.out.println("Header recieved  = "+rk);
        System.out.println("Recieved Object Message From Important Queue " + employee);
        System.out.println("*********************************************************************************************");
    }

    @RabbitHandler
    public void recievedMessage1(@Header(AmqpHeaders.CONSUMER_QUEUE) String rk,@Payload byte[] object) 
    {
        System.out.println("*********************************************************************************************");
        System.out.println("Header recieved  = "+rk);

        Employee emp = (Employee)SerializationUtils.deserialize(object);

        System.out.println("Recieved String Message From Important Queue " + emp);
        System.out.println("*********************************************************************************************");
    }

}

класс сотрудников

public class Employee
{

    private String empName;
    private String empId;

    public String getEmpName() {
        return empName;
    }

    public void setEmpName(String empName) {
        this.empName = empName;
    }

    public String getEmpId() {
        return empId;
    }

    public void setEmpId(String empId) {
        this.empId = empId;
    }

    @Override
    public String toString() {
        return "Employee [empName=" + empName + ", empId=" + empId + "]";
    }

}

ErrorLog

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.rabbitmq.consumer.service.RabbitMqListener.recievedMessage1(java.lang.String,byte[])' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:198) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.IllegalStateException: Could not deserialize object type
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:98) ~[spring-amqp-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:75) ~[spring-amqp-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at com.rabbitmq.consumer.service.RabbitMqListener.recievedMessage1(RabbitMqListener.java:32) ~[classes/:na]
    at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:130) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:60) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    ... 12 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.poc.springbootrabbitmq.model.Employee
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_191]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_191]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_191]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_191]
    at java.lang.Class.forName0(Native Method) ~[na:1.8.0_191]
    at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:686) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[na:1.8.0_191]
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:92) ~[spring-amqp-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    ... 22 common frames omitted

2019-05-06 00:39:35.801  WARN 2780 --- [ntContainer#0-1] o.s.a.s.c.Jackson2JsonMessageConverter   : Could not convert incoming message with content-type [application/octet-stream], 'json' keyword missing.

Кроме того, могу ли я создать сообщение в любом формате, кроме байта [] ???

Ответы [ 2 ]

0 голосов
/ 06 мая 2019

2019-05-06 00: 39: 35.801 ПРЕДУПРЕЖДЕНИЕ 2780 --- [ntContainer # 0-1] osascJackson2JsonMessageConverter: не удалось преобразовать входящее сообщение с типом содержимого [application / octet-stream], отсутствует ключевое слово json .

Преобразователь Джексона будет конвертировать, только если свойство content_type содержит json (например, application/json).

0 голосов
/ 05 мая 2019

Кроме того, могу ли я создать сообщение в любом формате, кроме байта [] ???

  • Да, это возможно.

Джексон (как правило) используется для преобразования объекта Java в строку Json (т.е. символьные данные). Похоже, вы пытаетесь это сделать, но вы также пытаетесь отправить класс в виде двоичных данных (byte []). Вам нужно выбрать один.

Stacktrace пытается помочь:

  • java.lang.ClassNotFoundException: com.poc.springbootrabbitmq.model.Employee. Класс, который вы пытаетесь сериализовать, не найден, поэтому даже хотя у него есть сообщение, он не знает, что с ним делать.
  • osascJackson2JsonMessageConverter: не удалось преобразовать входящее сообщение с типом содержимого [application / octet-stream], ключевое слово 'json отсутствует. Это говорит о том, что вы отправляете двоичный контент (а Джексон сбит с толку, потому что он ожидает Json, то есть символьные данные).

Возможно, это ближе к тому, чего вы пытаетесь достичь? https://thepracticaldeveloper.com/2016/10/23/produce-and-consume-json-messages-with-spring-boot-amqp/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...