Я нашел ответ на свой вопрос. Flux#distinct
может принимать Supplier
, который предоставляет начальное состояние, и BiPredicate
, который выполняет «отдельную» проверку, поэтому мы можем хранить произвольное состояние в хранилище и решать, сохранять ли каждый элемент.
Следующий код показывает, как сохранить первые 3 элемента в каждой группе mod2 без изменения порядка.
// Get first 3 elements per mod 2.
Flux<Integer> first3PerMod2 =
Flux.fromIterable(ImmutableList.of(9, 3, 7, 4, 5, 10, 6, 8, 2, 1))
.distinct(
// Group by mod2
num -> num % 2,
// Counter to store how many elements have been processed for each group.
() -> new HashMap<Integer, Integer>(),
// Increment or set 1 to the counter,
// and return whether 3 elements are published.
(map, num) -> map.merge(num, 1, Integer::sum) <= 3,
// Clean up the state.
map -> map.clear());
StepVerifier.create(first3PerMod2).expectNext(9, 3, 7, 4, 10, 6).verifyComplete();