Как получить актуальное последнее событие на обратное давление в Rxjava2?Flowable.onBackpressureLatest () не работает должным образом - PullRequest
0 голосов
/ 30 ноября 2018

Когда производитель создает событие быстрее, чем потребляет потребитель.

Я думал, используя Текучий с onBackpressureLatest () , я могу получить последнее событие, отправленное.

Но оказывается, что по умолчанию есть буфер размером 128. Я получил ранее датированное событие, буферизованное ранее.

Так как я могу получить фактическое последнее событие?

Вот пример кода:

Flowable.interval(40, TimeUnit.MILLISECONDS)
            .doOnNext{
                println("doOnNext $it")
            }
            .onBackpressureLatest()
            .observeOn(Schedulers.single())
            .subscribe {
                println("subscribe $it")
                Thread.sleep(100)
            }

То, что я ожидал:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   2
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   5
doOnNext    6
doOnNext    7
    subscribe   7
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   10
...

Что я получил:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   1
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   2
doOnNext    6
doOnNext    7
    subscribe   3
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   4
...
doOnNext    325
    subscribe   127
doOnNext    326
doOnNext    327
doOnNext    328
    subscribe   246
...

1 Ответ

0 голосов
/ 30 ноября 2018

Ваша проблема на самом деле заключается в observeOn, который запрашивает до 128 элементов по умолчанию и передает этот запрос до backpressureLatest, что, как следствие, ведет себя не так, как вы ожидаете.

Выможно использовать .observeOn(Scheduler,boolean,int), чтобы указать размер буфера, который должен исправить поведение, которое вы видите.

...