Путаница в отношении Rx Java onBackPressureBuffer и onBackPressureDrop - PullRequest
0 голосов
/ 08 января 2020

Я хочу, чтобы мой код работал как этот график, но он не работает ... enter image description here

Мой код:

    private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
        return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
            /* filter target event. */
            EventPredictable(cls)
        ).cast(
            /* cast to target event */
            cls
        ).onBackpressureDrop {
            Log.i(TAG, "drop event: $it")
        }.concatMap { data ->
            /* start interval task blocking */
            val period = 1L
            val unit = TimeUnit.SECONDS
            MLog.d(TAG, "startInterval: data = $data")
            Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
                getStopFlowable()
            ).map {
                Pair(data, it)
            }
        }
    }

    private fun getStopFlowable(): Flowable<StopIntervalEvent> {
        return RxBus.getDefault().register(StopIntervalEvent::class.java)
                    .toFlowable(BackpressureStrategy.LATEST)
    }

при отправке 140 событий за 10 мс, мой код сбрасывает 12 событий, а не сбрасывает 140 - 4 = 136 ожидаемых событий. Почему мой код не работает как график выше? Спасибо за просмотр и ответы!

1 Ответ

1 голос
/ 08 января 2020

onBackpressurDrop всегда готов к приему предметов, поэтому onBackpressureBuffer не имеет практического эффекта в вашей настройке. onBackpressurBuffer(int) потерпит неудачу при переполнении, поэтому вы никогда не увидите ожидаемого поведения с ним. Кроме того, concatMap по умолчанию выбирает 2 элемента заранее, поэтому он получает исходные элементы 1 и 2.

Вместо этого попробуйте использовать перегрузку с настраиваемой стратегией противодавления:

 mRelay
 .toFlowable(BackpressureStaregy.MISSING)
 .onBackpressureBuffer(4, null, BackpressureOverflowStrategy.DROP_LATEST)
 .flatMap(data -> 
     Flowable.intervalRange(0, DURATION.toLong(), 0, period, unit)
     .takeUntil(getStopFlowable())
     .map(it -> new Pair(data, it))
     , 1 // <--------------------------------------------------- max concurrency
 );
...