Как отфильтровать поток для всех элементов, имеющих наибольшее значение - PullRequest
0 голосов
/ 14 декабря 2018

Как мне отфильтровать издателя по элементам с самым высоким значением, не зная заранее самого высокого значения?

Вот небольшой тест для иллюстрации того, чего я пытаюсь достичь:

@Test
fun filterForHighestValuesTest() {
    val numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3)
        // what operators to apply to numbers to make the test pass?

    StepVerifier.create(numbers)
        .expectNext(8)
        .expectNext(8)
        .verifyComplete()
}

Я начал с оператора сокращения:

@Test
fun filterForHighestValuesTestWithReduce() {

    val numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3)
        .reduce { a: Int, b: Int -> if( a > b) a else b }

    StepVerifier.create(numbers)
        .expectNext(8)
        .verifyComplete()
}

и, конечно, этот тест пройден, но он выдаст только один Mono, тогда как я хотел бы получить Flux, содержащий все элементы, имеющиесамые высокие значения, например, 8 и 8 в этом простом примере.

Ответы [ 3 ]

0 голосов
/ 14 декабря 2018

Прежде всего, для этого вам потребуется состояние, поэтому вам нужно быть осторожным, чтобы иметь состояние для каждой подписки .Один из способов обеспечения того, чтобы при объединении операторов использовалось compose.

Предлагаемое решение

Flux<Integer> allMatchingHighest = numbers.compose(f -> {
        AtomicInteger highestSoFarState = new AtomicInteger(Integer.MIN_VALUE);
        AtomicInteger windowState = new AtomicInteger(Integer.MIN_VALUE);

        return f.filter(v -> {
            int highestSoFar = highestSoFarState.get();
            if (v > highestSoFar) {
                highestSoFarState.set(v);
                return true;
            }
            if (v == highestSoFar) {
                return true;
            }
            return false;
        })
                .bufferUntil(i -> i != windowState.getAndSet(i), true)
                .log()
                .takeLast(1)
                .flatMapIterable(Function.identity());
    });

Обратите внимание, что вся compose lamdba может быть извлечена в метод, используя кодссылка на метод и быть более читабельной.

Объяснение

Решение выполняется в 4 этапа, каждый из двух имеет свое собственное состояние AtomicInteger:

  1. Поэтапно найдите новый "самый высокий" элемент (пока что) и filter из элементов, которые меньше.Это приводит к (монотонно) Flux<Integer> увеличивающимся числам, таким как 1 5 7 8 8.
  2. buffer кусками равного числа.Мы используем bufferUntil вместо window* или groupBy, потому что наиболее вырожденный случай, когда числа все разные, и уже отсортированные не удастся с этими
  3. пропустить все буферы, кроме одного (takeLast(1))
  4. «Воспроизвести» этот последний буфер, который представляет количество вхождений нашего наибольшего значения (flatMapIterable)

Это правильно пройдет ваш StepVerifier тест, выдав 8 8.Обратите внимание, что используются следующие промежуточные буферы:

onNext([1])
onNext([5])
onNext([7, 7, 7])
onNext([8, 8])

Более сложное тестирование, оправдывающее bufferUntil

Гораздо более сложный источник, который не работает с groupBy, но не это решение:

Random rng = new Random();
//generate 258 numbers, each randomly repeated 1 to 10 times
//also, shuffle the whole thing
Flux<Integer> numbers = Flux
        .range(1, 258)
        .flatMap(i -> Mono.just(i).repeat(rng.nextInt(10)))
        .collectList()
        .map(l -> {
            Collections.shuffle(l);
            System.out.println(l);
            return l;
        })
        .flatMapIterable(Function.identity())
        .hide();

Это один пример последовательности буферов, в которую он может фильтроваться (имейте в виду, что воспроизводится только последний):

onNext([192])
onNext([245])
onNext([250])
onNext([256, 256])
onNext([257])
onNext([258, 258, 258, 258, 258, 258, 258, 258, 258])
onComplete()

Примечание. Если вы удалите map,тасует, тогда вы получаете «дегенеративный случай», когда даже windowUntil не сработает (takeLast приведет к слишком большому количеству открытых, но неиспользованных окон).

Это было забавно придумать!

0 голосов
/ 16 декабря 2018

Один из способов сделать это - отобразить поток целых чисел в поток списков с одним int в каждом, уменьшить результат и завершить flatMapMany, т.е.

final Flux<Integer> numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3);
final Flux<Integer> maxValues =
    numbers
        .map(
            n -> {
              List<Integer> list = new ArrayList<>();
              list.add(n);
              return list;
            })
        .reduce(
            (l1, l2) -> {
              if (l1.get(0).compareTo(l2.get(0)) > 0) {
                return l1;
              } else if (l1.get(0).equals(l2.get(0))) {
                l1.addAll(l2);
                return l1;
              } else {
                return l2;
              }
            })
        .flatMapMany(Flux::fromIterable);
0 голосов
/ 14 декабря 2018

Одним из возможных решений является группировка Flux до уменьшения и последующее отображение GroupedFlux следующим образом:

@Test
fun filterForHighestValuesTest() {
    val numbers = Flux.just(1, 5, 7, 2, 8, 3, 8, 4, 3)
        .groupBy { it }
        .reduce { t: GroupedFlux<Int, Int>, u: GroupedFlux<Int, Int> ->
            if (t.key()!! > u.key()!!) t else u
        }
        .flatMapMany {
            it
        }

    StepVerifier.create(numbers)
        .expectNext(8)
        .expectNext(8)
        .verifyComplete()
}
...