RxJava BackPressureStrategy.DROP ставит новые задачи в очередь, а не отбрасывает их - PullRequest
0 голосов
/ 29 апреля 2018

У меня есть код ниже в моей активности Android, который работает как задумано. при событиях прокрутки он вызывает от expensiveHttpCall() до ThreadPoolExecutor с SynchronousQueue & DiscardPolicy, что приводит к тому, что новые вызовы отбрасываются, пока еще выполняется последний вызов.

class MyActivity : AppCompatActivity() {

    val recyclerView: RecyclerView
.
    val oneAtATimeExecutor = ThreadPoolExecutor(1, 1, 1L, TimeUnit.SECONDS,
            SynchronousQueue<Runnable>(),
            ThreadPoolExecutor.DiscardPolicy()
    )

    private val scrollListener: RecyclerView.OnScrollListener = object : RecyclerView.OnScrollListener() {
        override fun onScrolled(recyclerView: RecyclerView?, dx: Int, dy: Int) {
            super.onScrolled(recyclerView, dx, dy)
            Log.e("rx","triggered!")
            oneAtATimeExecutor.execute { expensiveHttpCall() }
        }
    }

    override fun onCreate(savedInstanceState: Bundle?) {

        //...

        recyclerView.addOnScrollListener(scrollListener)

        //...
    }

    private fun expensiveHttpCall() {
        Thread.sleep(2000L)
        Log.e("rx", "done!")
    }
}

У меня проблемы с реализацией того же поведения с RxJava PublishProcessor:

class MyActivity : AppCompatActivity() {

    val recyclerView: RecyclerView

    val processor = PublishProcessor.create<Unit>()

    private val scrollListener: RecyclerView.OnScrollListener = object : RecyclerView.OnScrollListener() {
        override fun onScrolled(recyclerView: RecyclerView?, dx: Int, dy: Int) {
            super.onScrolled(recyclerView, dx, dy)
            Log.e("rx","triggered!")
            processor.onNext(Unit)
        }
    }

    override fun onCreate(savedInstanceState: Bundle?) {

        //...

        recyclerView.addOnScrollListener(scrollListener)

        //...

        processor.onBackpressureDrop().observeOn(Schedulers.newThread())
                .subscribe { expensiveHttpCall() }
    }

    private fun expensiveHttpCall() {
        Thread.sleep(2000L)
        Log.e("rx", "done!")
    }
}

К моему удивлению, после прокрутки консоль распечатывает кучу "triggered" сообщений. затем с двухсекундной скоростью следования он печатает "done!" столько раз, сколько печатается "triggered". это эффективно ведет себя как очередь, а не так, как было задумано. что я делаю не так?

1 Ответ

0 голосов
/ 29 апреля 2018

observeOn имеет внутреннюю очередь, в которой будет храниться ограниченное количество элементов. Для синхронных задач вы можете ограничить размер очереди 1:

processor.onBackpressureDrop()
   .observeOn(Schedulers.io(), false, 1)
   .subscribe { expensiveHttpCall() }

для асинхронных подзадач вы можете использовать flatMap с ограниченным параллелизмом:

processor.onBackpressureDrop()
   .flatMap({
       Flowable.just(1)
       .subscribeOn(Schedulers.io())
       .doOnNext { expensiveHttpCall() }
   }, 1)
...