Как выполнить расчет движущегося окна для Flux и вывести результат в виде нового Flux - PullRequest
1 голос
/ 03 июня 2019

Я хотел бы выполнить вычисление движущегося окна для Flux и создать Flux, содержащий вычисленные значения, но я не могу понять, как это сделать.

В качестве упрощенного примера, скажем, у меня есть поток целых чисел, и я хочу создать новый поток с суммой каждых 3 последовательных целых чисел в этом потоке.Для иллюстрации:

Первый поток содержит целые числа от 1 до 8: {1, 2, 3, 4, 5, 6, 7, 8}

Поток результатов должен содержать суммы: {1+2 + 3, 2 + 3 + 4, 3 + 4 + 5, 4 + 5 + 6, 5 + 6 + 7, 6 + 7 + 8}

Я легко могу произвести первый Флюс и получитьпоток потоков, содержащих последовательные 3 значения, следующим образом:

Flux<Integer> f1 = Flux.range(1,8);
Flux<Flux<Integer>> f2 = f1.window(3,1);

Я также могу подписаться () на f2 и вычислить суммы, но я не могу понять, как одновременно публиковать эти суммы как новый поток.

Я упускаю что-то простое или это действительно сложно?

1 Ответ

1 голос
/ 04 июня 2019

Вы можете использовать .reduce(Integer::sum) на внутреннем потоке для выполнения суммы элементов в окне и .flatMap на внешнем потоке для объединения этих сумм в один поток.

Обратите внимание, чтопоскольку .window вызывается с maxSize < skip, в конечных окнах будет меньше элементов максимального размера.

Flux<Integer> sums = Flux.range(1, 8)                    // Flux<Integer>
        .window(3, 1)                                    // Flux<Flux<Integer>>
        .flatMap(window -> window.reduce(Integer::sum)); // Flux<Integer>

StepVerifier.create(sums)
        .expectNext(6)  // 1+2+3
        .expectNext(9)  // 2+3+4
        .expectNext(12) // 3+4+5
        .expectNext(15) // 4+5+6
        .expectNext(18) // 5+6+7
        .expectNext(21) // 6+7+8
        .expectNext(15) // 7+8
        .expectNext(8)  // 8
        .verifyComplete();
...