Как я могу объединить элементы в потоке по группам / как сократить по группам? - PullRequest
0 голосов
/ 24 января 2019

Предположим, у вас есть поток объектов со следующей структурой:

class Element {
  String key;
  int count;
}

Теперь представьте, что эти элементы проходят в предопределенном порядке сортировки, всегда в группах ключа, например

{ key = "firstKey",  count=123}
{ key = "firstKey",  count=1  }
{ key = "secondKey", count=4  }
{ key = "thirdKey",  count=98 }
{ key = "thirdKey",  count=5  }
 .....

Я хочу создать поток, который возвращает один элемент для каждого отдельного key и суммируется count для каждой группы ключей. Так что в основном это классическое сокращение для каждой группы, но использование оператора reduce не работает, потому что он возвращает только один элемент, и я хочу получить поток с одним элементом для каждого отдельного ключа.

Использование bufferUntil может работать, но имеет недостаток: мне нужно сохранить состояние, чтобы проверить, изменился ли key по сравнению с предыдущим.

Использование groupBy является излишним, так как я знаю, что каждая группа заканчивается после того, как найден новый ключ, поэтому я не хочу хранить что-либо в кэше после этого события.

Возможно ли такое агрегирование с использованием Flux без сохранения состояния вне потока?

1 Ответ

0 голосов
/ 28 января 2019

В настоящее время (по состоянию на 3.2.5) это невозможно без отслеживания состояния самостоятельно.distinctUntilChanged мог бы соответствовать счету с минимальным состоянием, но не излучать состояние, только значения, которые он считал «отличными» в соответствии с указанным состоянием.

Самый минималистичный способ решения этой проблемы - windowUntil и compose + AtomicReference для каждого абонента:

Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
    //having this state inside a compose means it will not be shared by multiple subscribers
    AtomicReference<T> last = new AtomicReference<>(null);

    return source
      //use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
      .windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
      //reduce each window
      .flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
});
...