Spring-Integration: Как создать Spring Reactor Flux из потока интеграции WebFlux? - PullRequest
0 голосов
/ 05 мая 2018

В Как создать поток Spring Reactor Flux из потока интеграции Http? artem-bilan упоминается в комментарии о том, что в будущем можно будет использовать интеграцию webflux.

Со времени написания комментария интеграция WebFlux была учтена в spring -gration-webflux . Я попробовал следующее для репликации интеграции http-> flux на основе MVC с основанной на WebFlux заменой обработчика Http.inboundChannelAdapter и @GetRequest версии MVC на WebFlux.inboundChannelAdapter и WebFlux.inboundGateway:

@SpringBootApplication
public class WebfluxApplication {

  public static void main(String[] args) {
    SpringApplication.run(WebfluxApplication.class, args);
  }


  @Bean
  public Publisher<Message<String>> reactiveSource() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/message/{id}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .payloadExpression("#pathVariables.id")
            )
            .log()
            .channel(MessageChannels.flux())
            .toReactivePublisher();
  }


  @Bean
  public IntegrationFlow eventMessages() {
    return IntegrationFlows
            .from(WebFlux.inboundGateway("/events")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> reactiveSource())                
            .get();
}

}

Похоже, что поток в издателе reactiveSource() не получает никаких сообщений, по крайней мере, ничего не зарегистрировано для моего оператора .log().

Когда я заменяю reactiveSource() издателя в потоке eventMessages

.handle((p, h) -> reactiveSource()) 

фальшивым издателем

.handle((p, h) -> Flux.just("foo", "bar"))

Я получаю ответы SSE от

curl localhost:8080/events

Журнал трассировки показывает, что обработчик reactiveSource() POST сопоставлен и вызывается метод WebFluxInboundEndpoint.handle:

2018-05-05 16:50:58.788  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/message/{id}],methods=[POST]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:58.789  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/events],methods=[GET || POST],produces=[text/event-stream]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:59.191  INFO 6552 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-05-05 16:50:59.192  INFO 6552 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2018-05-05 16:50:59.196  INFO 6552 --- [           main] d.e.sample.webflux.WebfluxApplication    : Started WebfluxApplication in 2.608 seconds (JVM running for 3.419)
2018-05-05 16:51:06.918 DEBUG 6552 --- [ctor-http-nio-2] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:06.932 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:06.933 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@775cdb20]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:06.967 DEBUG 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@71f648a3]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:11.364 DEBUG 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method

Почему это?

1 Ответ

0 голосов
/ 05 мая 2018

Причина в том, что WebFluxInboundEndpoint прекращает обработку запросов POST без тела в doHandle(), строка

.map(body -> new HttpEntity<>(...)) 

никогда не выполняется, если нет содержимого тела запроса:

private Mono<Void> doHandle(ServerWebExchange exchange) {
    return extractRequestBody(exchange)
            .doOnSubscribe(s -> this.activeCount.incrementAndGet())
            .map(body -> new HttpEntity<>(body, exchange.getRequest().getHeaders()))
            .map(entity -> buildMessage(entity, exchange))
            .flatMap(requestMessage -> {
                if (this.expectReply) {
                    return sendAndReceiveMessageReactive(requestMessage)
                            .flatMap(replyMessage -> populateResponse(exchange, replyMessage));
                }
                else {
                    send(requestMessage);
                    return setStatusCode(exchange);
                }
            })
            .doOnTerminate(this.activeCount::decrementAndGet);

}

Обходной путь: вызывающий должен отправить любое непустое тело запроса, чтобы оно заработало, например, достаточно одной кавычки, переданной с -d:

curl -d ' http://localhost:8080/message/4

При таком запросе мой журнал содержит ожидаемый входящий GenericMessage, и ресурс / events начинает генерировать SSE.

2018-05-05 17:25:24.777 TRACE 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 17:25:24.777 DEBUG 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
2018-05-05 17:25:24.778  INFO 40436 --- [ctor-http-nio-8] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=4, headers={http_requestMethod=POST, Accept=*/*, User-Agent=curl/7.49.1, http_requestUrl=http://localhost:8080/message/4, Host=localhost:8080, id=9a09294d-280a-af3b-0894-23597cf1cb5f, Content-Length=1, contentType=application/x-www-form-urlencoded, timestamp=1525533924778}]
...