Spring WebFlux: HttpWebHandlerAdapter - установленное соединение было прервано - PullRequest
0 голосов
/ 05 сентября 2018

Я использую Spring WebFlux для публикации / получения отправленных событий сервера. На публикуемом коде, как показано ниже. Затем в тестовом случае я повторяю цикл 10 раз, чтобы вызвать «публикацию» для отправки сообщений.

public ValidationEventPublisher() {
    this.processor = DirectProcessor.<ExternalEvent>create().serialize();
    this.sink = processor.sink();
}

@RequestMapping(value = "/ssp/common/v1/eventstream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ExternalEvent>> sse() {
    return processor.map(e -> ServerSentEvent.builder(e).id(String.valueOf(msgId.incrementAndGet())).build())
            .onBackpressureBuffer();
}

public void publish(ValidationEvent validationEvent)
{
    log.info("Published Validation event...");
    sink.next(validationEvent);
}

На принимающей стороне код:

final Flux<Object> stream = WebClient.builder()
            .baseUrl(sspUrl + "/ssp/common/v1/eventstream")
            .defaultHeader(HttpHeaders.AUTHORIZATION, "Basic " + Base64Utils.encodeToString((username + ":" + password).getBytes(UTF_8)))
            .build()
            .get()
            .retrieve()
            .bodyToFlux(ServerSentEvent.class)
            .filter(e -> e.id() != null)
            .flatMap(e -> Mono.just(e.data()))
            .doOnError(e -> handleServerDown(e))
            .repeat();

    stream.subscribe(e -> processEvent(e));

Итак, проблема в том, что я могу получить только первое сообщение, и после этого я всегда получаю исключение.

- Published Validation event...
- Received validation event: {"messageId":"867b5ced-...}
- Published Validation event...
- Published Validation event...
2018-09-05 11:07:43,596 ERROR [reactor-http-nio-6] [] [] 
[org.springframework.web.server.adapter.HttpWebHandlerAdapter:213] - 
Unhandled failure: An established connection was aborted by the software in 
your host machine, response already set (status=null)
2018-09-05 11:07:43,597  WARN [reactor-http-nio-6] [] [] 
[org.springframework.http.server.reactive.ReactorHttpHandlerAdapter:76] - 
Handling completed with error: An established connection was aborted by the 
software in your host machine
...

Я потратил довольно много времени на исследования, но не могу найти соответствующую информацию. Обратите внимание, что я выполняю обе стороны в одном тестовом примере, что означает, что они находятся в одном и том же процессе и JVM. Я не думаю, что это имеет значение, но на всякий случай.

Я обнаружил, что причиной является "отмена". Но почему? Например, если я добавлю «doOnCancel» на стороне получателя, я увижу, что он называется.

Ответы [ 2 ]

0 голосов
/ 07 августа 2019

Прошло много времени с тех пор, как вопрос был опубликован. Теперь я думаю, что проблема была вызвана некоторым исключением, когда я обрабатываю сообщение, например, NPE в процессеEvent. Это легко доказать, если вы добавите дикий try ... catch в processEvent.

0 голосов
/ 31 октября 2018

Использование spring-boot-starter-parent 2.1.0.RELEASE Я столкнулся с подобной проблемой. Попробуйте запустить следующую службу на порту 8080 и клиент на 8081

@SpringBootApplication
public class ServiceApplication {

    @Bean
    RouterFunction<ServerResponse> routes(Handler handler) {
        return route(GET("/there").and(accept(MediaType.TEXT_EVENT_STREAM)), handler::all);
    }

    public static void main(String[] args) {
        SpringApplication.run(ServiceApplication.class, args);
    }

    @Component
    class Handler {
        public Mono<ServerResponse> all(ServerRequest request) {
            Flux<String> aFlux = Flux.range(0, 1000).map(i -> UUID.randomUUID().toString());
            return ok().contentType(MediaType.TEXT_EVENT_STREAM).body(aFlux, String.class);
        }
    }   
}

@SpringBootApplication
public class ClientApplication {

    @Bean
    RouterFunction<ServerResponse> routes(Handler handler) {
        return route(GET("/here").and(accept(MediaType.TEXT_EVENT_STREAM)), handler::all);
    }

    public static void main(String[] args) {
        SpringApplication.run(ClientApplication.class, args);
    }

    @Component
    class Handler {
        public Mono<ServerResponse> all(ServerRequest request) {
            Flux<String> resp = WebClient.create("http://localhost:8080")
                                        .get()
                                        .uri("/there")
                                        .accept(MediaType.TEXT_EVENT_STREAM)
                                        .retrieve()
                                        .bodyToFlux(String.class);

            return ok().contentType(MediaType.TEXT_EVENT_STREAM).body(resp, String.class);
        }
    }   
}

Теперь выполните curl -H "Accept: text / event-stream" "http://localhost:8081/here" несколько раз, и служба иногда выдаст ошибку:

oswsadapter.HttpWebHandlerAdapter: [f903e4b6] Ошибка [java.io.IOException: установленное соединение было прервано программным обеспечением на вашем хост-компьютере] для HTTP GET "/ there", но ServerHttpResponse уже зафиксирован (200 OK)

...