rxJava buffer () со временем, которое учитывает обратное давление - PullRequest
0 голосов
/ 26 апреля 2018

Версии оператора buffer, которые не работают по времени, учитывают противодавление согласно JavaDoc:

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-int-

Однако, любая версия buffer, которая использует временные буферы, не поддерживает противодавление, как эта

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-long-java.util.concurrent.TimeUnit-int-

Я понимаю, что это происходит из-за того, что, когда время идет, вы не можете остановить его аналогично, например, оператору interval, который также не поддерживает противодавление по той же причине.

Что мне нужно, так это оператор буферизации, основанный как на размере, так и на времени, и полностью поддерживающий противодавление, передавая сигналы противодавления ОБА вверх по течению И производителю, тикающему время, что-то вроде этого:

someFlowable() .buffer( Flowable.interval(1, SECONDS).onBackpressureDrop(), 10 );

Так что теперь я могу снять галочку на сигналах противодавления.

Это что-то достижимое в настоящее время в rxJava2? Как насчет Project-Reactor?

Ответы [ 4 ]

0 голосов
/ 02 августа 2019

У меня были проблемы с решением https://stackoverflow.com/a/55136139/6719538 при использовании DisposableSubscriber в качестве абонентов, и, насколько я вижу, этот преобразователь не учитывает вызовы Suscription#request от последующих абонентов (они могут переполнить их). Я создаю свою версию, которая была протестирована на производстве - BufferTransformerHonorableToBackpressure.java . Фан-Ян - большое уважение к идее.

0 голосов
/ 03 января 2019

У меня была еще одна попытка, которая привела к получению достаточно сверхмощного решения, которое, кажется, работает (TM)

Требования :

  1. Оператор буферизации, который освобождает буфер после истечения временного интервала или когда буфер достигает максимального размера, в зависимости от того, что произойдет раньше
  2. Оператор должен иметь полное обратное давление, то есть, если запросы прекращаются из нисходящего потока, оператор буфера не должен отправлять данные и не должен вызывать никаких исключений (как это делает оператор starndard Flowable.buffer (interval, TimeUnit). Оператор не должен использовать свой источник / восходящий поток в неограниченном режиме либо
  3. Сделайте это с составлением существующих / реализованных операторов.

Зачем кому-то это нужно? :

Необходимость в таком операторе возникла, когда я захотел реализовать буферизацию в бесконечном / длительном потоке. Я хотел буферизовать для эффективности, но стандарт Flowable.buffer (n) здесь не подходит, так как «бесконечный» поток может испускать k

Схема решения :

Решение основано на операторах generateAsync и partialCollect, оба реализованы в проекте https://github.com/akarnokd/RxJava2Extensions. Остальное - стандарт RxJava2.

  1. Сначала обернуть все значения из восходящего потока в классе контейнера C
  2. Затем merge тот поток с потоком, источник которого использует generateAsync. Этот поток использует switchMap для передачи экземпляров C, которые фактически являются сигналами времени ожидания.
  3. Два объединенных потока поступают в partialCollect, который содержит ссылку на объект «API» для передачи элементов в generateAsync восходящий поток. Это своего рода цикл обратной связи, который переходит от paritialCollect через объект "API" к generateAsync, который возвращается к partialCollect. Таким образом, partialCollect может при получении первого элемента в буфере излучать сигнал, который эффективно начнет тайм-аут. Если буфер не заполняется до истечения времени ожидания, это приведет к тому, что экземпляр пустого C (не содержащего никакого значения) вернется в partialCollect. Он обнаружит его как сигнал тайм-аута и освободит агрегированный буфер вниз по потоку. Если буфер освобождается из-за достижения максимального размера, он будет освобожден, и следующий элемент будет запущен еще раз. Любой сигнал тайм-аута (экземпляр пустого C), поступающий поздно, то есть после освобождения буфера из-за достижения максимального размера, будет игнорироваться. Это возможно, потому что partialCollect создает и отправляет элемент сигнала тайм-аута, который потенциально может вернуться к нему. Проверка идентичности этого элемента позволяет обнаружить сигнал о превышении допустимого времени ожидания.

код : https://gist.github.com/artur-jablonski/5eb2bb470868d9eeeb3c9ee247110d4a

0 голосов
/ 13 марта 2019

Я недавно столкнулся с проблемой, и вот моя реализация. Его можно использовать так:

    Flowable<List<T>> bufferedFlow = (some flowable of T)
                              .compose(new BufferTransformer(1, TimeUnit.MILLISECONDS, 8))

Поддерживает противодавление по указанному вами счету.

Вот реализация: https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d

0 голосов
/ 28 ноября 2018

Это было какое-то время, но я снова взглянул на это, и почему-то мне показалось, что это:

public static <T> FlowableTransformer<T, List<T>> buffer(
    int n, long period, TimeUnit unit)
{
    return o ->
        o.groupBy(__ -> 1)
         .concatMapMaybe(
             gf ->
                 gf.take(n)
                   .take(period, SECONDS)
                   .toList()
                   .filter(l -> !l.isEmpty())
         );
}

в значительной степени делает то, что я описал. Это, если я правильно, полностью обратное давление и будет либо буферизировать n элементов, либо по истечении указанного времени, если достаточное количество элементов не было собрано

...