У меня есть код ниже в моей активности Android, который работает как задумано. при событиях прокрутки он вызывает от expensiveHttpCall()
до ThreadPoolExecutor
с SynchronousQueue
& DiscardPolicy
, что приводит к тому, что новые вызовы отбрасываются, пока еще выполняется последний вызов.
class MyActivity : AppCompatActivity() {
val recyclerView: RecyclerView
.
val oneAtATimeExecutor = ThreadPoolExecutor(1, 1, 1L, TimeUnit.SECONDS,
SynchronousQueue<Runnable>(),
ThreadPoolExecutor.DiscardPolicy()
)
private val scrollListener: RecyclerView.OnScrollListener = object : RecyclerView.OnScrollListener() {
override fun onScrolled(recyclerView: RecyclerView?, dx: Int, dy: Int) {
super.onScrolled(recyclerView, dx, dy)
Log.e("rx","triggered!")
oneAtATimeExecutor.execute { expensiveHttpCall() }
}
}
override fun onCreate(savedInstanceState: Bundle?) {
//...
recyclerView.addOnScrollListener(scrollListener)
//...
}
private fun expensiveHttpCall() {
Thread.sleep(2000L)
Log.e("rx", "done!")
}
}
У меня проблемы с реализацией того же поведения с RxJava
PublishProcessor
:
class MyActivity : AppCompatActivity() {
val recyclerView: RecyclerView
val processor = PublishProcessor.create<Unit>()
private val scrollListener: RecyclerView.OnScrollListener = object : RecyclerView.OnScrollListener() {
override fun onScrolled(recyclerView: RecyclerView?, dx: Int, dy: Int) {
super.onScrolled(recyclerView, dx, dy)
Log.e("rx","triggered!")
processor.onNext(Unit)
}
}
override fun onCreate(savedInstanceState: Bundle?) {
//...
recyclerView.addOnScrollListener(scrollListener)
//...
processor.onBackpressureDrop().observeOn(Schedulers.newThread())
.subscribe { expensiveHttpCall() }
}
private fun expensiveHttpCall() {
Thread.sleep(2000L)
Log.e("rx", "done!")
}
}
К моему удивлению, после прокрутки консоль распечатывает кучу "triggered"
сообщений. затем с двухсекундной скоростью следования он печатает "done!"
столько раз, сколько печатается "triggered"
. это эффективно ведет себя как очередь, а не так, как было задумано.
что я делаю не так?