Как получить общее количество записей в Flux событий на стороне сервера - PullRequest
2 голосов
/ 29 мая 2020

Я использую реактивное программирование, когда клиент получает поток потоков событий на стороне сервера, после чего эти события потребляются. С точки зрения функциональности это работает. У меня проблема, когда я пытаюсь зарегистрировать общее количество записей в потоке потока. Ниже приведены фрагменты кода.

Давайте создадим экземпляр, подключенный к серверу

        final WebClient client = WebClient
            .builder()
            .baseUrl(url)
            .build();

А затем мы запустим подключение, подписавшись на его topi c

        final Flux<ServerSentEvent<SomeEvent>> eventStream = client.get()
            .uri("/bus/sse?id=" + subscription)
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .flatMapMany(response -> response.bodyToFlux(type))
            .repeat();

        log(eventStream, "connectSSE");

        eventStream
            .doOnError(throwable -> this.onError(throwable, eventStream))
            .doOnComplete(() -> this.onComplete(eventStream));            

        subscribe = eventStream.subscribe(someServerSideEvent -> this.onEvent(someServerSideEvent , eventStream));

Метод ниже обрабатывает событие

  private void onEvent(final ServerSentEvent<SomeEvent> content, Flux<ServerSentEvent<SomeEvent>> eventStream) {
    log(eventStream, "onEvent");
    //Code for handling event
}

У меня проблема с приведенным ниже фрагментом кода. На самом деле я хочу зарегистрировать количество записей в потоке, и я ожидал, что он напечатает некоторые числа, но он напечатает что-то вроде ниже. Нужно какое-то решение без использования .block (). Любая помощь приветствуется.

"Подсчитанные значения в этом Flux: MonoMapFuseable, вызывающий onEvent"

private void log1(Flux<ServerSentEvent<SomeEvent>> eventStream, final String caller) {
    try {
        eventStream.count().map(count -> {
            LOGGER.info("Counted values in this Flux: {}, caller {}", count.longValue(), caller);
            return count;
        });
    } catch (final Exception e) {
        LOGGER.info("Counted values in this Flux failed", e);
    }
}
...