Я создал процессор Spring Cloud Stream с контроллером, показанным ниже:
@StreamListener(INPUT)
@SendTo(OUTPUT)
public Flux<Document> receive(@Valid @RequestBody Flux<Document> document) {
// Some operation on received Flux<Document> and return the Flux<Document>
}
Я развернул его в Spring Cloud Data Flow v2.5.1 с приложениями Source и Sink. Сборка проходит нормально, и поток успешно развертывается. Но когда я передаю данные, я получаю ошибку ниже:
ERROR 5206 — [ wer.http.wer-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@39ce27f2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `reactor.core.publisher.Flux` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])...
Я использую springBootVersion = "2.2.6.RELEASE"
и зависимости Gradle, которые я используются:
dependencies {
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR3"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR3"))
implementation("org.springframework.boot:spring-boot-starter-jetty")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.boot:spring-boot-starter-webflux") {
exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
}
implementation("com.fasterxml.jackson.core:jackson-annotations:2.9.8")
implementation("javax.validation:validation-api:2.0.1.Final")**
}
Он отлично работает, когда я использую springBootVersion = "2.1.9.RELEASE" и зависимости Gradle:
dependencies {
implementation("org.springframework.cloud:spring-cloud-stream-reactive:2.2.0.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-jetty")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit:2.2.0.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-webflux") {
exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
}
implementation("com.fasterxml.jackson.core:jackson-annotations:2.9.8")
implementation("javax.validation:validation-api:2.0.1.Final")**
}
Но я хочу использовать версию Spring загрузки 2.2.x и выглядит так, как будто зависимость spring-cloud-stream-reactive устарела для версии облачного потока Spring 2.2 и не может использоваться с версией 2.2.x загрузки Spring.