Поддержка отправки HashMaps между Source / Processor / Sink через RabbitBinder, похоже, изменена в новом SpringBoot (2.0.2).
общий родительский pom.xml для всех модулей выглядит следующим образом:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>demo.stream</groupId>
<artifactId>demo-stream-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<modules>
<module>source-module</module>
<module>processor-module</module>
<module>sink-module</module>
</modules>
<packaging>pom</packaging>
<name>demo-stream</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath />
<!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.RC2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
Четко указано следующее:
Если на исходящем канале не установлено свойство типа контента, Spring Cloud Stream будет сериализовать полезную нагрузку с использованием сериализатора на основена платформе сериализации Kryo.Для десериализации сообщений в месте назначения требуется, чтобы класс полезной нагрузки присутствовал в пути к классам получателя.
Учитывая правила, ожидается, что он будет работать со стандартными типами Java, такими как "HashMap", без необходимости создания настраиваемого сообщения.конвертер.
Не удалось заставить этот простой код правильно работать между источником и процессором:
Исходная конфигурация и код:
spring:
application:
name: test
cloud:
config:
uri: http://blade1:8888
name: scdf-tester
stream:
bindings:
output:
#content-type: 'application/x-java-serialized-object'
#content-type: 'application/json'
content-type:
destination: demo-stream-source-output
@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {
@Autowired
private Source channels;
//@InboundChannelAdapter(Source.OUTPUT)
@Scheduled(fixedRate = 2000)
public MessageSource<Map<String, Object>> timerMessageSource() {
Map<String, Object> mapa = new HashMap<>();
mapa.put("string", "string");
mapa.put("string", "string");
mapa.put("long", 1212121L);
mapa.put("integer", 1212121);
Map<String, Object> mapaInner = new HashMap<>();
mapaInner.put("string", "string");
mapaInner.put("string", "string");
mapaInner.put("long", 1212121L);
mapaInner.put("integer", 1212121);
mapa.put("innerMapa", mapaInner);
channels.output().send(MessageBuilder.withPayload(mapa).build());
}
}
ОтВ отладчике ясно видно, что полезная нагрузка преобразуется в строку JSON (ApplicationJsonMessageMarshallingConverter), а заголовок типа контента устанавливается в «application / json».
Это не то, что ожидается, хотя это приемлемо и законно в некоторых обстоятельствах.Ожидается, что kryo сериализовал хэш-карту в виде байтового массива.
Вывод отладчика выглядит следующим образом:
this = {AbstractMessageChannel$ChannelInterceptorList@7877}
logger = {LogFactory$Log4jLog@7884}
interceptors = {CopyOnWriteArrayList@8890} size = 1
0 = {MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor@8991}
messageConverter = {CompositeMessageConverter@9043} "CompositeMessageConverter[converters=[org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@2863f519, org.springframework.cloud.stream.converter.TupleJsonMessageConverter@e9dffa1, org.springframework.messaging.converter.ByteArrayMessageConverter@cc64ad, org.springframework.cloud.stream.converter.ObjectStringMessageConverter@97df9ef, org.springframework.cloud.stream.converter.JavaSerializationMessageConverter@3e011ee4, org.springframework.cloud.stream.converter.KryoMessageConverter@5f808e72, org.springframework.cloud.stream.converter.JsonUnmarshallingConverter@3c0a7023]]"
this$0 = {MessageConverterConfigurer@9044}
mimeType = null
MessageConverterConfigurer$AbstractContentTypeInterceptor.this$0 = {MessageConverterConfigurer@9044}
size = 1
message = {GenericMessage@9169} "GenericMessage [payload=byte[117], headers={contentType=application/json;charset=UTF-8, id=e1b9b79e-5501-0ebe-178f-59ef5538192c, timestamp=1528647360089}]"
payload = {byte[117]@9373} {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
headers = {MessageHeaders@9374} size = 3
0 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12201} "contentType" -> "application/json;charset=UTF-8"
1 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12202} "id" -> "e1b9b79e-5501-0ebe-178f-59ef5538192c"
2 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12203} "timestamp" -> "1528647360089"
channel = {DirectChannel@6940} "output"
interceptorStack = {ArrayDeque@8833} size = 1
Конфигурация процессора и код:
spring:
application:
name: test
cloud:
config:
uri: http://blade1:8888
name: scdf-tester
stream:
bindings:
output:
#content-type: 'application/x-java-object'
destination: demo-stream-processor-output
input:
content-type: 'application/json;type=java.util.Map'
destination: demo-stream-source-output
PROCESSOR CASE1: В «StreamListener», когда Map используется как параметр, логика «лучшее усилие» выдает заголовки без полезной нагрузки
сообщение recv: class org.springframework.messaging.MessageHeaders
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Map handle(final Map message) {
LOG.info("recv message: {} {}", message.getClass(),message);
//do some transformations...
message.put("transformer_says","hello simon..");
return message;
}
Вывод журнала выглядит следующим образом:
2018-06-10 18:55:51 [demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg-1] INFO demostream.modules.DemoProcessor - recv message: class org.springframework.messaging.MessageHeaders {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg, amqp_redelivered=false, id=671bc2d8-aaba-5ddb-b02b-f92a9dba3e0f, amqp_consumerTag=amq.ctag-ZnNk0O3vNs0yhOPKMTM3Fg, contentType=application/json;charset=UTF-8, timestamp=1528649751852}
Конечно, здесь ожидается исключение:
Вызывается: java.lang.UnsupportedOperationException: MessageHeaders является неизменным в org.springframework.messaging.MessageHeaders.put (MessageHeaders.java:249)
PROCESSOR CASE2: в «StreamListener», когда сообщение используется в качестве параметра »логика «лучшее из возможного» дает JSON представление карты, отличной от Original HashMap
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object handle(Message<?> message) {
LOG.info("recv message: {}",message);
String jsonMap = new String((byte[]) message.getPayload());
LOG.info("jsonMap : {}", jsonMap);
return jsonMap;
}
Вывод журнала выглядит следующим образом:
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO demostream.modules.DemoProcessor - recv message: GenericMessage [payload=byte[117], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=38, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ, amqp_redelivered=false, id=271ad47e-7b36-c01c-0c19-b3201297ddd7, amqp_consumerTag=amq.ctag-QIVzqanCxiNF-HXUTUVn7Q, contentType=application/json;charset=UTF-8, timestamp=1528649563755}]
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO demostream.modules.DemoProcessor - jsonMap : {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
Предыдущая версия (1.5.9) и связанные с ними потоки облачных зависимостейработать без проблем.
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>1.5.9.RELEASE</version>
<relativePath></relativePath>
</parent>
Пожалуйста, вы можете дать несколько советов относительно этого поведения.
Если предположить, что я делаю это неправильно, простой вопрос будет:
Как отправить полезную нагрузку HashMap с помощью Kryo ser / der, что, на мой взгляд, лучше, чем сериализация JSON (меньше межпроцессных издержек)
Спасибо!
С уважением, Иван