Предположим, у вас есть поток объектов со следующей структурой:
class Element {
String key;
int count;
}
Теперь представьте, что эти элементы проходят в предопределенном порядке сортировки, всегда в группах ключа, например
{ key = "firstKey", count=123}
{ key = "firstKey", count=1 }
{ key = "secondKey", count=4 }
{ key = "thirdKey", count=98 }
{ key = "thirdKey", count=5 }
.....
Я хочу создать поток, который возвращает один элемент для каждого отдельного key
и суммируется count
для каждой группы ключей.
Так что в основном это классическое сокращение для каждой группы, но использование оператора reduce
не работает, потому что он возвращает только один элемент, и я хочу получить поток с одним элементом для каждого отдельного ключа.
Использование bufferUntil
может работать, но имеет недостаток: мне нужно сохранить состояние, чтобы проверить, изменился ли key
по сравнению с предыдущим.
Использование groupBy
является излишним, так как я знаю, что каждая группа заканчивается после того, как найден новый ключ, поэтому я не хочу хранить что-либо в кэше после этого события.
Возможно ли такое агрегирование с использованием Flux
без сохранения состояния вне потока?