Использование RxJava для сгруппированной скользящей средней - PullRequest
3 голосов
/ 05 ноября 2019

Я могу вычислить скользящее среднее простого списка целых чисел, например:

Integer arr[] = {1, 2, 3, 4, 5, 6};                        
Observable<Integer> oi = Observable.from(arr);                          
oi.buffer(24, 1).subscribe(x -> average(x))

Теперь давайте скажем, что у меня есть объекты вместо целых чисел, таких как

private class Model{
  public String key;
  public Double value;
}

, где я хочусгруппировать и рассчитать скользящее среднее на основе key неблокирующим способом (т.е. я получаю непрерывный поток из rabbitmq) таким образом, что он будет излучать значения {key->average}.

Я знаю об операторе groupBy, но когда я его использую, все становится не так. Каков наилучший способ сделать это с groupBy?

Ответы [ 2 ]

0 голосов
/ 06 ноября 2019

Мое понимание вопроса состоит в том, что существует бесконечный поток, и цель состоит в том, чтобы вычислить среднее значение скользящего окна в потоке. Но вопрос неясен в деталях, и есть несколько различных способов его интерпретации.

CASE 1. The sliding window is the most recent 24 items of the stream.
CASE 2. The sliding window is the most recent 24 items per type.
    CASE 2.1 Each emit is the average of the most recent sliding window that has moved.
    CASE 2.2 Each emit should contain averages of all groups.
    CASE 2.3 Or some other forms..

Было бы полезно, если бы вы могли привести нам пример входных данных и ожидаемых выходных данных.

Следующий ответпредназначен для CASE 2.1:

oi.groupBy(model -> model.key) // [1]
        .flatMap(groupedO -> groupedO.buffer(4, 1) // [2]
                .map(list -> {
                    double avg = list.stream().mapToDouble(m -> m.value)
                            .average().orElse(0.0);
                    return new Model(list.get(0).key, avg); // [3]
                }))
        .subscribe(result -> { //[4]
            // do something
        });
  1. Группа основана на ключе
  2. Применить buffer к сгруппированной наблюдаемой
  3. Рассчитать среднее
  4. результат в виде Model(group, average)
0 голосов
/ 05 ноября 2019

Вы можете сделать это:

Observable<Model> oi = ...;
oi.groupBy(model -> model.key)
    .flatMapSingle(Observable::toList)
    .subscribe(modelsGrouped -> { // key: modelsGrouped.get(0).key
        double avg = average(modelsGrouped);
        //...
    });

Надеюсь, это поможет!

...