У меня есть поток значений, которые мне нужно «объединить», пока они не встретят определенное условие. Я не хочу использовать внешнюю переменную в качестве буфера для проверки того, что мое условие выполнено, но я не могу найти способ выполнить операцию «lowerWhile» или «bufferWhile», потому что у меня нет доступа к буферу (с .buffer
) для проверьте, что у меня достаточно данных для передачи результата в нисходящем направлении.
У меня есть:
ohlcIntfFlux
.reduce(OHLCIntf::mergeWith)
.filter(timeFrameProvider::isBarComplete)
.map(this::makeBar)
.subscribe(pub::next)
, но это уменьшает весь поток, а затем проверяет фильтр.
Мне нужно уменьшить пока условие фильтра не будет выполнено, передайте его вниз по течению.
Надеюсь, это достаточно ясно ...
Спасибо!