У меня есть цепочка, где я делаю некоторые блокирующие вызовы ввода-вывода (например, HTTP-вызов).Я хочу, чтобы блокирующий вызов потреблял значение, продолжал без прерывания, но отбрасывал все накопившееся время, а затем использовал следующее значение таким же образом.
Рассмотрим следующий пример:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
Thread.sleep(1000)
it
}.blockingForEach { println(it) }
}
С наивной точки зрения, я бы ожидал напечатать что-то вроде 0, 10, 20, ...
, но он печатает 0, 1, 2, ...
.
Что я делаю не так?
РЕДАКТИРОВАТЬ:
Я думал о наивном добавлении debounce
, чтобы съесть входящий поток:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
Но теперь я получаю java.lang.InterruptedException: sleep interrupted
.
РЕДАКТИРОВАТЬ:
Кажется, что работает следующее:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
Вывод, как и ожидалось 0, 10, 20, ...
!!
Это правильный путь?
Iотметил, что throttleLast
переключится на Computation-Scheduler.Есть ли способ вернуться к исходному планировщику?
РЕДАКТИРОВАТЬ:
Я также иногда получаю java.lang.InterruptedException: sleep interrupted
с этим вариантом.