mimi c `reduWhile` как оператор - PullRequest
0 голосов
/ 19 апреля 2020

У меня есть поток значений, которые мне нужно «объединить», пока они не встретят определенное условие. Я не хочу использовать внешнюю переменную в качестве буфера для проверки того, что мое условие выполнено, но я не могу найти способ выполнить операцию «lowerWhile» или «bufferWhile», потому что у меня нет доступа к буферу (с .buffer) для проверьте, что у меня достаточно данных для передачи результата в нисходящем направлении.

У меня есть:

                ohlcIntfFlux
                        .reduce(OHLCIntf::mergeWith)
                        .filter(timeFrameProvider::isBarComplete)
                        .map(this::makeBar)
                        .subscribe(pub::next)

, но это уменьшает весь поток, а затем проверяет фильтр.

Мне нужно уменьшить пока условие фильтра не будет выполнено, передайте его вниз по течению.

Надеюсь, это достаточно ясно ...

Спасибо!

1 Ответ

0 голосов
/ 20 апреля 2020

вот как я это сделал:

ConnectableFlux.create(pub ->
                ohlcIntfFlux
                        .scan((i, acc) -> {
                            if (!timeFrameProvider.isBarComplete(acc))
                                return acc.mergeWith(i);

                            pub.next(makeBar(acc));
                            return i;
                        })
                        .subscribe()
        )

он делает то, что я хочу, но если вы можете предложить более функциональный способ, я был бы рад услышать, что

...