Невозможно создать экземпляр `response.core.publisher.Flux` в приложении Spring Cloud Stream, развернутом в Spring Cloud Data Flow - PullRequest
0 голосов
/ 29 мая 2020

Я создал процессор Spring Cloud Stream с контроллером, показанным ниже:

    @StreamListener(INPUT)
    @SendTo(OUTPUT)
    public Flux<Document> receive(@Valid @RequestBody Flux<Document> document) {
        // Some operation on received Flux<Document> and return the Flux<Document>
    }

Я развернул его в Spring Cloud Data Flow v2.5.1 с приложениями Source и Sink. Сборка проходит нормально, и поток успешно развертывается. Но когда я передаю данные, я получаю ошибку ниже:

ERROR 5206 — [ wer.http.wer-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@39ce27f2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `reactor.core.publisher.Flux` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])... 

Я использую springBootVersion = "2.2.6.RELEASE"

и зависимости Gradle, которые я используются:

dependencies {
        implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR3"))
        implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR3"))

        implementation("org.springframework.boot:spring-boot-starter-jetty")
        implementation("org.springframework.boot:spring-boot-starter-actuator")
        implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
        implementation("org.springframework.boot:spring-boot-starter-webflux") {
            exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
        }

        implementation("com.fasterxml.jackson.core:jackson-annotations:2.9.8")
        implementation("javax.validation:validation-api:2.0.1.Final")**
    }

Он отлично работает, когда я использую springBootVersion = "2.1.9.RELEASE" и зависимости Gradle:

dependencies {
        implementation("org.springframework.cloud:spring-cloud-stream-reactive:2.2.0.RELEASE")

        implementation("org.springframework.boot:spring-boot-starter-jetty")
        implementation("org.springframework.boot:spring-boot-starter-actuator")
        implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit:2.2.0.RELEASE")
        implementation("org.springframework.boot:spring-boot-starter-webflux") {
            exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
        }

        implementation("com.fasterxml.jackson.core:jackson-annotations:2.9.8")
        implementation("javax.validation:validation-api:2.0.1.Final")**
    }

Но я хочу использовать версию Spring загрузки 2.2.x и выглядит так, как будто зависимость spring-cloud-stream-reactive устарела для версии облачного потока Spring 2.2 и не может использоваться с версией 2.2.x загрузки Spring.

...