Ваше требование звучит интересно! У нас есть switchOnFirst
, который может быть полезен для проверки первого элемента. Но если у вас есть N элементов для проверки, мы можем попробовать что-то вроде этого.
Здесь я предполагаю, что мне нужно проверить первые 5 элементов, которые должны быть <= 5. Тогда это допустимый поток. В противном случае мы просто выдавали бы ошибку, говоря, что проверка не удалась. </p>
Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));
integerFlux
.buffer(5)
.switchOnFirst((signal, flux) -> {
//first 5 elements are <= 5, then it is a valid stream
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);
Однако этот подход не годится, так как он продолжает собирать 5 элементов каждый раз, даже после того, как первая проверка выполнена, чего мы можем не хотеть.
Чтобы избежать буферизации N элементов после проверки, мы можем использовать bufferUntil
. После того, как мы собрали первые N элементов и проверили, он просто передал бы 1 элемент по мере поступления в нисходящий поток.
AtomicInteger atomicInteger = new AtomicInteger(1);
integerFlux
.bufferUntil(i -> {
if(atomicInteger.get() < 5){
atomicInteger.incrementAndGet();
return false;
}
return true;
})
.switchOnFirst((signal, flux) -> {
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);