Подписывается ли на комбинат последних? - PullRequest
0 голосов
/ 15 февраля 2019

Есть ли смысл добавлять subscribeOn() после Observable.combineLatest(), например, так:

Observable.combineLatest(
  someObservable,
  theOtherObservable,
  (something, theOther) -> iAmFunction(something, theOther)))
  .subscribeOn(Schedulers.computation())
  ...

То, что я понимаю, iAmFunction() будет вызываться на любом планировщике, который объединяет Observable emmit как последний.

Так какова цель этого subscribeOn() в конце?

Ответы [ 2 ]

0 голосов
/ 18 февраля 2019

subscribeOn указывает, где будут происходить побочные эффекты подписки, и не гарантирует, что вы получите элементы в потоке, который он использует.Вместо этого используйте observeOn:

Observable.combineLatest(
    someObservable.observeOn(Schedulers.single()),
    theOtherObservable.observeOn(Schedulers.single()),
    (something, theOther) -> iAmFunction(something, theOther))
)

Что combineLatest делает при подписке, так это подписывается на ее источники, таким образом, транзитивно вы подписываетесь на someObservable и theOtherObservable в потоке планировщика computation().Однако обратите внимание, что если someObservable не вернет управление, theOtherObservable не будет подписано.Всегда лучше указывать subscribeOn как можно ближе к источнику (ам):

Observable.combineLatest(
    someObservable.subscribeOn(Schedulers.computation()),
    theOtherObservable.subscribeOn(Schedulers.computation()),
    (something, theOther) -> iAmFunction(something, theOther))
)
0 голосов
/ 15 февраля 2019

В этом вопросе subscribeOn решает which thread executes the iAmFunction() ( Ссылка ).В основном subscribeOn решает исходящий поток.

оператор SubscribeOn определяет, с каким потоком начнет работать Observable, независимо от того, в какой точке цепочки операторов этот оператор вызывается.

Например, есть две логики combineLatest.

fun main(args: Array<String>) {

    Observables.combineLatest(
        Observable.just("o1"),
        Observable.just("o2")
    ) { _, _ -> Thread.currentThread().name }
        .subscribe {
            println("Without subscribeOn")
            println("in combineLatest: $it")
            println("in subscribe: ${Thread.currentThread().name}")
        }

    println()

    Observables.combineLatest(
        Observable.just("o1"),
        Observable.just("o2")
    ) { _, _ -> Thread.currentThread().name }
        .subscribeOn(Schedulers.io())
        .subscribe {
            println("With subscribeOn")
            println("in combineLatest: $it")
            println("in subscribe: ${Thread.currentThread().name}")

        }

    Thread.sleep(500)
}
Without subscribeOn
in combineLatest: main
in subscribe: main

With subscribeOn
in combineLatest: RxCachedThreadScheduler-1
in subscribe: RxComputationThreadPool-1

Как видите, subscribeOn изменяет поток для combineLatest и subscribe.Если вы не используете observeOn, эффект может увеличиться с emitting items до subscribe.

...