Я использую Spring WebFlux для публикации / получения отправленных событий сервера. На публикуемом коде, как показано ниже. Затем в тестовом случае я повторяю цикл 10 раз, чтобы вызвать «публикацию» для отправки сообщений.
public ValidationEventPublisher() {
this.processor = DirectProcessor.<ExternalEvent>create().serialize();
this.sink = processor.sink();
}
@RequestMapping(value = "/ssp/common/v1/eventstream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ExternalEvent>> sse() {
return processor.map(e -> ServerSentEvent.builder(e).id(String.valueOf(msgId.incrementAndGet())).build())
.onBackpressureBuffer();
}
public void publish(ValidationEvent validationEvent)
{
log.info("Published Validation event...");
sink.next(validationEvent);
}
На принимающей стороне код:
final Flux<Object> stream = WebClient.builder()
.baseUrl(sspUrl + "/ssp/common/v1/eventstream")
.defaultHeader(HttpHeaders.AUTHORIZATION, "Basic " + Base64Utils.encodeToString((username + ":" + password).getBytes(UTF_8)))
.build()
.get()
.retrieve()
.bodyToFlux(ServerSentEvent.class)
.filter(e -> e.id() != null)
.flatMap(e -> Mono.just(e.data()))
.doOnError(e -> handleServerDown(e))
.repeat();
stream.subscribe(e -> processEvent(e));
Итак, проблема в том, что я могу получить только первое сообщение, и после этого я всегда получаю исключение.
- Published Validation event...
- Received validation event: {"messageId":"867b5ced-...}
- Published Validation event...
- Published Validation event...
2018-09-05 11:07:43,596 ERROR [reactor-http-nio-6] [] []
[org.springframework.web.server.adapter.HttpWebHandlerAdapter:213] -
Unhandled failure: An established connection was aborted by the software in
your host machine, response already set (status=null)
2018-09-05 11:07:43,597 WARN [reactor-http-nio-6] [] []
[org.springframework.http.server.reactive.ReactorHttpHandlerAdapter:76] -
Handling completed with error: An established connection was aborted by the
software in your host machine
...
Я потратил довольно много времени на исследования, но не могу найти соответствующую информацию.
Обратите внимание, что я выполняю обе стороны в одном тестовом примере, что означает, что они находятся в одном и том же процессе и JVM. Я не думаю, что это имеет значение, но на всякий случай.
Я обнаружил, что причиной является "отмена". Но почему? Например, если я добавлю «doOnCancel» на стороне получателя, я увижу, что он называется.