RxJava Flowable.create (), как уважать поток подписки () - PullRequest
0 голосов
/ 28 декабря 2018

Я обертываю API обратного вызова пользовательской библиотеки (dataClient) в RxJava Flowable.dataClient использует свой собственный поток, поэтому его обратный вызов вызывается в своем собственном потоке.

В моей цепочке Rx я пытаюсь указать планировщик вычислений, используя .subscribeOn(Schedulers.computation()).Тем не менее, когда я печатаю имя нити в моей цепочке Rx, я получаю мою dataClient нить.

Что я должен сделать, чтобы мой Flowable использовал нить, указанную в .subscribeOn()?

Flowable.create({ emitter ->
    dataClient.setCallback(object : Callback {
        override fun message(message: DataModel) {
            emitter.onNext(vehicle)
        }

        override fun done() {
            emitter.onComplete()
        }
    })
    emitter.setCancellable {
        dataClient.setCallback(null)
    }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext { Log.e("DATA", Thread.currentThread().name) }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { data -> Log.d("DATA", "Got data" + data.id)) }

1 Ответ

0 голосов
/ 28 декабря 2018

Планировщик subscribeOn обеспечивает выполнение подписки в соответствующем потоке.Подписка происходит точно так же, и она обрабатывается иначе, чем планировщик observeOn, который планирует передачу элемента в новый поток.

Flowable.create({ emitter ->
    // this runs with the computation scheduler
    dataClient.setCallback(object : Callback {
        override fun message(message: DataModel) {
            // this runs on the thread it's called from
            emitter.onNext(vehicle)
        }

        override fun done() {
            // this runs on the thread it's called from
            emitter.onComplete()
        }
    })
    emitter.setCancellable {
        dataClient.setCallback(null)
    }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext {
        // this runs on the thread of the onNext call
        Log.e("DATA", Thread.currentThread().name)
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        // this runs on the main thread
        data -> Log.d("DATA", "Got data" + data.id))
    }

Поскольку код вашей подписки не блокирует и не поддерживает потокдля эмиссии настройка subscribeOn не требуется и может быть опущена.В основном действует только с синхронными источниками.

...