Предотвращение взаимоблокировок в том же пуле при использовании Flowable в Reactive Extensions - PullRequest
0 голосов
/ 09 января 2019

При подписке на поток Reactive Extensions Flowable я заметил, что поток останавливается / зависает (больше не генерируются будущие элементы и не возвращается ошибка) после того, как было возвращено 128 элементов.

val download: Flowable<DownloadedRecord> = sensor.downloadRecords()
download
    .doOnComplete { Log.i( "TEST", "Finished!" ) }
    .subscribe(
        { record ->
            Log.i( "TEST", "Got record: ${record.record.id}; left: ${record.recordsLeft}" )
        },
        { error ->
            Log.i( "TEST", "Error while downloading records: $error" )
        } )

Скорее всего, это связано с Reactive Extensions. Я обнаружил размер буфера по умолчанию Flowable установлен на 128 ; вряд ли это совпадение.

Пытаясь понять, что происходит, я столкнулся со следующей документацией по Flowable.subscribeOn.

Если в цепочке есть источник типа create(FlowableOnSubscribe, BackpressureStrategy), рекомендуется иметь значение requestOn false, чтобы избежать тупика в том же пуле , поскольку запросы могут накапливаться за отправляющим / блокирующим источником.

Хотя я не совсем понимаю, что такое тупик в том же пуле в этой ситуации, похоже, что-то похожее происходит с моим потоком.

1. Что такое тупик в том же пуле в Reactive Extensions? Каким будет минимальный пример кода для его воссоздания (на Android)?

В настоящее время в растерянности, я пытался применить .subscribeOn( Schedulers.io(), false ) до .subscribe, не понимая, что это делает, но мой поток все еще блокируется после того, как было выпущено 128 элементов.

2. Как можно отладить эту проблему и как / где ее можно решить?

1 Ответ

0 голосов
/ 10 января 2019
  1. Что такое тупик в том же пуле в Reactive Extensions?

RxJava использует однопоточные исполнители в стандартных планировщиках. Когда блокирующий или активный источник отправляет элементы, он занимает этот единственный поток, и, хотя нисходящий поток запрашивает больше, subscribeOn будет планировать эти запросы за текущим выполняющимся / блокирующим кодом, который затем никогда не получит уведомление о новых возможностях.

Каким будет минимальный пример кода для его воссоздания (на Android)?

Зачем вам код, который блокируется?

Я пытался применить .subscribeOn (Schedulers.io (), false)

Каков ваш реальный поток? Скорее всего, вы применили subscribeOn слишком далеко от источника, и это не имеет никакого эффекта. Самое надежное - поставить его рядом с create.

Как можно отладить эту проблему и как / где ее можно решить?

Поместите doOnNext и doOnRequest в разные места и посмотрите, где исчезают сигналы.

...