Понимание параметра пропускной способности в Rx Java наBackpressureBuffer - PullRequest
0 голосов
/ 15 апреля 2020

Вот небольшой пример приложения, которое я написал:

package ru.maksim.sample.app

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit


class MainActivity : AppCompatActivity() {
    private val subject = PublishSubject.create<Int>()
    private lateinit var disposable: Disposable
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        disposable = observeInts()
            .subscribe(
                {
                    Log.d("SampleApp", "next=$it")
                },
                {
                    Log.e("SampleApp", "error", it)
                },
                {
                    Log.d("SampleApp", "complete")
                }
            )
        start.setOnClickListener {
            subject.onNext(1)
        }
    }

    override fun onDestroy() {
        disposable.dispose()
        super.onDestroy()
    }

    private fun observeInts() = subject
        .toFlowable(BackpressureStrategy.BUFFER)
        .onBackpressureBuffer(4, {
            Log.d("SampleApp", "Overflow")
        }, BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.computation())
        .flatMap {
            Log.d("SampleApp", "onNext BEFORE delay: $it")
            Flowable.just(it)
        }
        .delay(10L, TimeUnit.SECONDS)
        .flatMap {
            Log.d("SampleApp", "onNext AFTER delay: $it")
            Flowable.just(it)
        }
}

start - просто кнопка. После нажатия кнопки более 4 (4 - емкость буфера, как вы можете видеть в onBackpressureBuffer) раз, я ожидал увидеть Overflow - логи, но этого не произошло. Я не понимаю почему.

Ответы [ 2 ]

1 голос
/ 15 апреля 2020

Я думаю, что вы не видели предупреждение, потому что было очень мало событий. Вы можете попытаться заменить свой обратный вызов в setOnClickListener на этом и проверить его снова:

start.setOnClickListener {
    for (i in 0..1000) {
        subject.onNext(i)
    }   
}

Также вы можете изменить строку с методом «наблюдаем» на:

.observeOn(Schedulers.computation(), false, 1)

Поскольку планировщик имеет его буфер тоже.

0 голосов
/ 22 апреля 2020

Я думаю, что нашел ответ здесь . А именно,

onBackpressureBuffer(int capacity)

Это ограниченная версия, которая сигнализирует BufferOverflowError в случае, если его буфер достигает заданной емкости.

Flowable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Релевантность этого оператора уменьшается, поскольку все больше и больше операторов теперь позволяют установка их размеров буфера. В остальном, это дает возможность «расширить свой внутренний буфер», имея большее число с onBackpressureBuffer, чем их значение по умолчанию.


Похоже, что в дополнение к 16 передается onBackpressureBuffer другим операторы имеют свои собственные буферы. И когда 17-й элемент получен, предыдущие 16 могут быть помещены в буферы разных операторов.

...