Прежде всего, для этого вам потребуется состояние, поэтому вам нужно быть осторожным, чтобы иметь состояние для каждой подписки .Один из способов обеспечения того, чтобы при объединении операторов использовалось compose
.
Предлагаемое решение
Flux<Integer> allMatchingHighest = numbers.compose(f -> {
AtomicInteger highestSoFarState = new AtomicInteger(Integer.MIN_VALUE);
AtomicInteger windowState = new AtomicInteger(Integer.MIN_VALUE);
return f.filter(v -> {
int highestSoFar = highestSoFarState.get();
if (v > highestSoFar) {
highestSoFarState.set(v);
return true;
}
if (v == highestSoFar) {
return true;
}
return false;
})
.bufferUntil(i -> i != windowState.getAndSet(i), true)
.log()
.takeLast(1)
.flatMapIterable(Function.identity());
});
Обратите внимание, что вся compose
lamdba может быть извлечена в метод, используя кодссылка на метод и быть более читабельной.
Объяснение
Решение выполняется в 4 этапа, каждый из двух имеет свое собственное состояние AtomicInteger
:
- Поэтапно найдите новый "самый высокий" элемент (пока что) и
filter
из элементов, которые меньше.Это приводит к (монотонно) Flux<Integer>
увеличивающимся числам, таким как 1 5 7 8 8
. buffer
кусками равного числа.Мы используем bufferUntil
вместо window*
или groupBy
, потому что наиболее вырожденный случай, когда числа все разные, и уже отсортированные не удастся с этими - пропустить все буферы, кроме одного (
takeLast(1)
) - «Воспроизвести» этот последний буфер, который представляет количество вхождений нашего наибольшего значения (
flatMapIterable
)
Это правильно пройдет ваш StepVerifier
тест, выдав 8 8
.Обратите внимание, что используются следующие промежуточные буферы:
onNext([1])
onNext([5])
onNext([7, 7, 7])
onNext([8, 8])
Более сложное тестирование, оправдывающее bufferUntil
Гораздо более сложный источник, который не работает с groupBy
, но не это решение:
Random rng = new Random();
//generate 258 numbers, each randomly repeated 1 to 10 times
//also, shuffle the whole thing
Flux<Integer> numbers = Flux
.range(1, 258)
.flatMap(i -> Mono.just(i).repeat(rng.nextInt(10)))
.collectList()
.map(l -> {
Collections.shuffle(l);
System.out.println(l);
return l;
})
.flatMapIterable(Function.identity())
.hide();
Это один пример последовательности буферов, в которую он может фильтроваться (имейте в виду, что воспроизводится только последний):
onNext([192])
onNext([245])
onNext([250])
onNext([256, 256])
onNext([257])
onNext([258, 258, 258, 258, 258, 258, 258, 258, 258])
onComplete()
Примечание. Если вы удалите map
,тасует, тогда вы получаете «дегенеративный случай», когда даже windowUntil
не сработает (takeLast
приведет к слишком большому количеству открытых, но неиспользованных окон).
Это было забавно придумать!