Как я могу фильтровать Flux по состоянию? - PullRequest
0 голосов
/ 12 ноября 2018

Я хотел бы применить filter к своему Flux на основе состояния, рассчитанного из предыдущих значений. Тем не менее, рекомендуется избегать использования состояния в операторах согласно javadoc

Обратите внимание, что следует избегать использования состояний в java.util.function / lambdas, используемых в операторах Flux, поскольку они могут совместно использоваться несколькими подписчиками.

Например, Flux#distinct фильтрует элементы, которые появляются ранее. Как мы можем реализовать нашу собственную версию distinct?

1 Ответ

0 голосов
/ 12 ноября 2018

Я нашел ответ на свой вопрос. Flux#distinct может принимать Supplier, который предоставляет начальное состояние, и BiPredicate, который выполняет «отдельную» проверку, поэтому мы можем хранить произвольное состояние в хранилище и решать, сохранять ли каждый элемент.

Следующий код показывает, как сохранить первые 3 элемента в каждой группе mod2 без изменения порядка.

// Get first 3 elements per mod 2.
Flux<Integer> first3PerMod2 =
    Flux.fromIterable(ImmutableList.of(9, 3, 7, 4, 5, 10, 6, 8, 2, 1))
        .distinct(
            // Group by mod2
            num -> num % 2,
            // Counter to store how many elements have been processed for each group.
            () -> new HashMap<Integer, Integer>(),
            // Increment or set 1 to the counter,
            // and return whether 3 elements are published.
            (map, num) -> map.merge(num, 1, Integer::sum) <= 3,
            // Clean up the state.
            map -> map.clear());

StepVerifier.create(first3PerMod2).expectNext(9, 3, 7, 4, 10, 6).verifyComplete();
...