Обнаружение пустого окна Flux перед публикацией Webclient - PullRequest
0 голосов
/ 04 июня 2019

Со ссылкой на мой предыдущий вопрос Разделение сообщения потокового потока WebClient на массивы JSON , я использовал;

myFlux
 .window(5)
 .flatMap(window -> client
  .post()
  .body(window, myClass.class)
  .exchange()
  .flatMap(response -> response.bodyToMono)
 )
 .subscribe();

Это отлично работает.Тем не менее, в медленный день, 5 сообщений приходят не сразу, и window не будет отправлять ничего, пока window не заполнится.Поэтому я переключился на windowTimeout(5, Duration.ofSeconds(5)).

Теперь, если нет данных и превышен Duration, код передает пустой window, что приводит к публикации пустого массива.

Как обнаружить пустой window и не запускать post?

1 Ответ

1 голос
/ 05 июня 2019

К сожалению, невозможно узнать, сколько элементов будет испущено Flux, не прочитав весь Flux до завершения.

Поскольку ваш размер окна относительно мал, вы можете собрать все элементы, испускаемыеFlux в List с использованием .collectList(), а затем проверьте, пуст ли список перед отправкой запроса.

myFlux
    .windowTimeout(5, Duration.ofSeconds(5))
    .flatMap(window ->
        // collect everything in the window into a list
        window.collectList()
             // ignore empty windows
            .filter(list -> !list.isEmpty())
             // send the request
            .flatMap(list -> client
                .post()
                .body(Flux.fromIterable(list), MyClass.class)
                .exchange()
                .flatMap(response -> response.bodyToMono(MyResponse.class))))

...