Обмен сообщениями между источником, процессором и приемником (java.util.HashMap) - PullRequest
0 голосов
/ 10 июня 2018

Поддержка отправки HashMaps между Source / Processor / Sink через RabbitBinder, похоже, изменена в новом SpringBoot (2.0.2).

общий родительский pom.xml для всех модулей выглядит следующим образом:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>demo.stream</groupId>
   <artifactId>demo-stream-parent</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <modules>
      <module>source-module</module>
      <module>processor-module</module>
      <module>sink-module</module>
   </modules>
   <packaging>pom</packaging>
   <name>demo-stream</name>
   <description>Demo project for Spring Boot</description>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.0.2.RELEASE</version>
      <relativePath />
      <!-- lookup parent from repository -->
   </parent>
   <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <java.version>1.8</java.version>
      <spring-cloud.version>Finchley.RC2</spring-cloud.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-config</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream-reactive</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
      </dependency>
   </dependencies>
   <dependencyManagement>
      <dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
         </dependency>
      </dependencies>
   </dependencyManagement>
   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>
   <repositories>
      <repository>
         <id>spring-milestones</id>
         <name>Spring Milestones</name>
         <url>https://repo.spring.io/milestone</url>
         <snapshots>
            <enabled>false</enabled>
         </snapshots>
      </repository>
   </repositories>
</project>

Четко указано следующее:

Если на исходящем канале не установлено свойство типа контента, Spring Cloud Stream будет сериализовать полезную нагрузку с использованием сериализатора на основена платформе сериализации Kryo.Для десериализации сообщений в месте назначения требуется, чтобы класс полезной нагрузки присутствовал в пути к классам получателя.

Учитывая правила, ожидается, что он будет работать со стандартными типами Java, такими как "HashMap", без необходимости создания настраиваемого сообщения.конвертер.

Не удалось заставить этот простой код правильно работать между источником и процессором:

Исходная конфигурация и код:

spring:
  application:
    name: test
  cloud:
    config:
      uri: http://blade1:8888
      name: scdf-tester
    stream:
      bindings:
        output:
          #content-type: 'application/x-java-serialized-object'
          #content-type: 'application/json'
          content-type:
          destination: demo-stream-source-output



@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {

    @Autowired
    private Source channels;

    //@InboundChannelAdapter(Source.OUTPUT)
    @Scheduled(fixedRate = 2000)
    public MessageSource<Map<String, Object>> timerMessageSource() {

            Map<String, Object> mapa = new HashMap<>();
            mapa.put("string", "string");
            mapa.put("string", "string");
            mapa.put("long", 1212121L);
            mapa.put("integer", 1212121);
            Map<String, Object> mapaInner = new HashMap<>();
            mapaInner.put("string", "string");
            mapaInner.put("string", "string");
            mapaInner.put("long", 1212121L);
            mapaInner.put("integer", 1212121);

            mapa.put("innerMapa", mapaInner);

            channels.output().send(MessageBuilder.withPayload(mapa).build());
        }

}

ОтВ отладчике ясно видно, что полезная нагрузка преобразуется в строку JSON (ApplicationJsonMessageMarshallingConverter), а заголовок типа контента устанавливается в «application / json».

Это не то, что ожидается, хотя это приемлемо и законно в некоторых обстоятельствах.Ожидается, что kryo сериализовал хэш-карту в виде байтового массива.

Вывод отладчика выглядит следующим образом:

this = {AbstractMessageChannel$ChannelInterceptorList@7877} 
 logger = {LogFactory$Log4jLog@7884} 
 interceptors = {CopyOnWriteArrayList@8890}  size = 1
  0 = {MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor@8991} 
   messageConverter = {CompositeMessageConverter@9043} "CompositeMessageConverter[converters=[org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@2863f519, org.springframework.cloud.stream.converter.TupleJsonMessageConverter@e9dffa1, org.springframework.messaging.converter.ByteArrayMessageConverter@cc64ad, org.springframework.cloud.stream.converter.ObjectStringMessageConverter@97df9ef, org.springframework.cloud.stream.converter.JavaSerializationMessageConverter@3e011ee4, org.springframework.cloud.stream.converter.KryoMessageConverter@5f808e72, org.springframework.cloud.stream.converter.JsonUnmarshallingConverter@3c0a7023]]"
   this$0 = {MessageConverterConfigurer@9044} 
   mimeType = null
   MessageConverterConfigurer$AbstractContentTypeInterceptor.this$0 = {MessageConverterConfigurer@9044} 
 size = 1
message = {GenericMessage@9169} "GenericMessage [payload=byte[117], headers={contentType=application/json;charset=UTF-8, id=e1b9b79e-5501-0ebe-178f-59ef5538192c, timestamp=1528647360089}]"
 payload = {byte[117]@9373} {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
 headers = {MessageHeaders@9374}  size = 3
  0 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12201} "contentType" -> "application/json;charset=UTF-8"
  1 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12202} "id" -> "e1b9b79e-5501-0ebe-178f-59ef5538192c"
  2 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12203} "timestamp" -> "1528647360089"
channel = {DirectChannel@6940} "output"
interceptorStack = {ArrayDeque@8833}  size = 1

Конфигурация процессора и код:

spring:
  application:
    name: test
  cloud:
    config:
      uri: http://blade1:8888
      name: scdf-tester
    stream:
      bindings:
        output:
          #content-type: 'application/x-java-object'
          destination: demo-stream-processor-output
        input:
          content-type: 'application/json;type=java.util.Map'
          destination: demo-stream-source-output

PROCESSOR CASE1: В «StreamListener», когда Map используется как параметр, логика «лучшее усилие» выдает заголовки без полезной нагрузки

сообщение recv: class org.springframework.messaging.MessageHeaders

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Map handle(final Map message) {
    LOG.info("recv message: {} {}", message.getClass(),message);
    //do some transformations...
    message.put("transformer_says","hello simon..");
    return message;
}

Вывод журнала выглядит следующим образом:

  2018-06-10 18:55:51 [demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg-1] INFO  demostream.modules.DemoProcessor - recv message: class org.springframework.messaging.MessageHeaders {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg, amqp_redelivered=false, id=671bc2d8-aaba-5ddb-b02b-f92a9dba3e0f, amqp_consumerTag=amq.ctag-ZnNk0O3vNs0yhOPKMTM3Fg, contentType=application/json;charset=UTF-8, timestamp=1528649751852}

Конечно, здесь ожидается исключение:

Вызывается: java.lang.UnsupportedOperationException: MessageHeaders является неизменным в org.springframework.messaging.MessageHeaders.put (MessageHeaders.java:249)

PROCESSOR CASE2: в «StreamListener», когда сообщение используется в качестве параметра »логика «лучшее из возможного» дает JSON представление карты, отличной от Original HashMap

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object handle(Message<?> message) {
    LOG.info("recv message: {}",message);
    String jsonMap = new String((byte[]) message.getPayload());
    LOG.info("jsonMap : {}", jsonMap);
    return jsonMap;
}

Вывод журнала выглядит следующим образом:

2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO  demostream.modules.DemoProcessor - recv message: GenericMessage [payload=byte[117], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=38, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ, amqp_redelivered=false, id=271ad47e-7b36-c01c-0c19-b3201297ddd7, amqp_consumerTag=amq.ctag-QIVzqanCxiNF-HXUTUVn7Q, contentType=application/json;charset=UTF-8, timestamp=1528649563755}]
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO  demostream.modules.DemoProcessor - jsonMap : {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}

Предыдущая версия (1.5.9) и связанные с ними потоки облачных зависимостейработать без проблем.

  <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>1.5.9.RELEASE</version>
        <relativePath></relativePath>
    </parent>

Пожалуйста, вы можете дать несколько советов относительно этого поведения.

Если предположить, что я делаю это неправильно, простой вопрос будет:

Как отправить полезную нагрузку HashMap с помощью Kryo ser / der, что, на мой взгляд, лучше, чем сериализация JSON (меньше межпроцессных издержек)

Спасибо!

С уважением, Иван

1 Ответ

0 голосов
/ 12 июня 2018

Решением для выполнения двоичной сериализации / десериализации крио между источником / процессором является использование следующей конфигурации:

Эта директива:

output.content-type: application/x-java-object

запускает двоичную сериализацию крио (в противном случае JSON используется по умолчанию).

Источник (конф. И код)

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: source-output
          content-type: application/x-java-object



   @EnableBinding(Source.class)
    public class SourceTester {
        @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000"))
        public Message<HashMap<String,Object>> source() {
            HashMap<String,Object> mapa = new HashMap<>();
            mapa.put("foo","bar");
            mapa.put("bar",1337);
            return MessageBuilder.withPayload(mapa).build();
        }
    }

Или другой подход:

@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {

    @Autowired
    private Source channels;

    @Scheduled(fixedRate = 5000)
    public MessageSource<Map<String, Object>> timerMessageSource() { 
            HashMap<String,Object> mapa = new HashMap<>();
            mapa.put("foo","bar");
            mapa.put("bar",1337);    
        channels.output().send(MessageBuilder.withPayload(mapa).build());
        }

}

Процессор (конф. И код)

spring:
      cloud:
        stream:
          bindings:
            input:
              destination: source-output
            output:
              destination: processor-output
              content-type: application/x-java-object



   @EnableBinding(Processor.class)
    @EnableAutoConfiguration
    public class ProcessorTester {
        @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
        protected Message<HashMap> process(Message<HashMap> input){
            input.getPayload().put("Processor", "was here");
            return input;
        }
    }

Вывод:

Обработчик процессора (преобразователь) с «параметром типа HashMap» и Message необходим для запуска криосериализации байта [] в HashMap автоматически:

protected Message<HashMap> process(Message<HashMap> input)

Если директива типа выходного содержимого отключена (Исходный модуль или любой другой производитель)

 spring:
      cloud:
        stream:
          bindings:
            output:
              destination: source-output
              ###content-type: application/x-java-object

Сериализация по умолчанию - HashMap в строку JSON.

С уважением,

Иван

...