При подписке на поток Reactive Extensions Flowable
я заметил, что поток останавливается / зависает (больше не генерируются будущие элементы и не возвращается ошибка) после того, как было возвращено 128 элементов.
val download: Flowable<DownloadedRecord> = sensor.downloadRecords()
download
.doOnComplete { Log.i( "TEST", "Finished!" ) }
.subscribe(
{ record ->
Log.i( "TEST", "Got record: ${record.record.id}; left: ${record.recordsLeft}" )
},
{ error ->
Log.i( "TEST", "Error while downloading records: $error" )
} )
Скорее всего, это связано с Reactive Extensions. Я обнаружил размер буфера по умолчанию Flowable
установлен на 128 ; вряд ли это совпадение.
Пытаясь понять, что происходит, я столкнулся со следующей документацией по Flowable.subscribeOn
.
Если в цепочке есть источник типа create(FlowableOnSubscribe, BackpressureStrategy)
, рекомендуется иметь значение requestOn
false, чтобы избежать тупика в том же пуле , поскольку запросы могут накапливаться за отправляющим / блокирующим источником.
Хотя я не совсем понимаю, что такое тупик в том же пуле в этой ситуации, похоже, что-то похожее происходит с моим потоком.
1. Что такое тупик в том же пуле в Reactive Extensions? Каким будет минимальный пример кода для его воссоздания (на Android)?
В настоящее время в растерянности, я пытался применить .subscribeOn( Schedulers.io(), false )
до .subscribe
, не понимая, что это делает, но мой поток все еще блокируется после того, как было выпущено 128 элементов.
2. Как можно отладить эту проблему и как / где ее можно решить?