Я использую реактивное программирование, когда клиент получает поток потоков событий на стороне сервера, после чего эти события потребляются. С точки зрения функциональности это работает. У меня проблема, когда я пытаюсь зарегистрировать общее количество записей в потоке потока. Ниже приведены фрагменты кода.
Давайте создадим экземпляр, подключенный к серверу
final WebClient client = WebClient
.builder()
.baseUrl(url)
.build();
А затем мы запустим подключение, подписавшись на его topi c
final Flux<ServerSentEvent<SomeEvent>> eventStream = client.get()
.uri("/bus/sse?id=" + subscription)
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(response -> response.bodyToFlux(type))
.repeat();
log(eventStream, "connectSSE");
eventStream
.doOnError(throwable -> this.onError(throwable, eventStream))
.doOnComplete(() -> this.onComplete(eventStream));
subscribe = eventStream.subscribe(someServerSideEvent -> this.onEvent(someServerSideEvent , eventStream));
Метод ниже обрабатывает событие
private void onEvent(final ServerSentEvent<SomeEvent> content, Flux<ServerSentEvent<SomeEvent>> eventStream) {
log(eventStream, "onEvent");
//Code for handling event
}
У меня проблема с приведенным ниже фрагментом кода. На самом деле я хочу зарегистрировать количество записей в потоке, и я ожидал, что он напечатает некоторые числа, но он напечатает что-то вроде ниже. Нужно какое-то решение без использования .block (). Любая помощь приветствуется.
"Подсчитанные значения в этом Flux: MonoMapFuseable, вызывающий onEvent"
private void log1(Flux<ServerSentEvent<SomeEvent>> eventStream, final String caller) {
try {
eventStream.count().map(count -> {
LOGGER.info("Counted values in this Flux: {}, caller {}", count.longValue(), caller);
return count;
});
} catch (final Exception e) {
LOGGER.info("Counted values in this Flux failed", e);
}
}