Почему оператор общего ресурса rxjava2 не является многоадресным? - PullRequest
0 голосов
/ 18 февраля 2019

многоадресная рассылка происходит, когда все мои подписчики получают одно и то же излучение перед переходом к следующему излучению.Но когда я использую команду share, я не вижу многоадресной рассылки.У меня дорогая операция, которую я хочу сделать только один раз.давайте взглянем на этот код:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.share()
    }
}

вот фактический вывод:

    expensive operation
 1st subscriber: 2;
expensive operation
 1st subscriber: 4;
expensive operation
 1st subscriber: 6;
expensive operation
 1st subscriber: 8;
expensive operation
 1st subscriber: 10;
expensive operation
 2nd subscriber: 2;
expensive operation
 2nd subscriber: 4;
expensive operation
 2nd subscriber: 6;
expensive operation
 2nd subscriber: 8;
expensive operation
 2nd subscriber: 10;

но почему он повторяет дорогостоящую операцию перед отправкой всем подписчикам.повторять дорогую операцию для каждого подписчика?Я использую общий ресурс, поэтому я ожидаю, что результат будет таким:

    expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;

Что еще интересно, я обнаружил, что ожидаемый результат появляется только в том случае, если я сделаю следующее:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish()

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.connect()
    }
}

, что делает его видимым для подключения, а затем подключается вручную.Почему доля не работает?

ОБНОВЛЕНИЕ: Я хочу очень четко пояснить, в чем проблема:

доля должна быть такой же, как publish (). RefCount (), и я также подумал, чтоПоделиться будет многоадресной для меня, но я не вижу, чтобы это делало.Давайте рассмотрим не использование общего ресурса, а использование публикации и подключения вручную:

 var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish()

    fun doMultiplyBy2(){
        //ob1 = ob1.share()
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.connect()
    }
}

. Вывод этого:

expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;

, и это именно то, что я хочу.дорогая операция выполняется один раз за выброс.

не позволяет изменить его на использование общего ресурса:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish().refCount()//or can use share()

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
    }
}

, который дает следующий вывод:

    expensive operation
 1st subscriber: 2;
expensive operation
 1st subscriber: 4;
expensive operation
 1st subscriber: 6;
expensive operation
 1st subscriber: 8;
expensive operation
 1st subscriber: 10;
expensive operation
 2nd subscriber: 2;
expensive operation
 2nd subscriber: 4;
expensive operation
 2nd subscriber: 6;
expensive operation
 2nd subscriber: 8;
expensive operation
 2nd subscriber: 10;

что тогда является целью publish (). RefCount () если он не многоадресный, то просто как обычный наблюдаемый.какой в ​​этом смысл или поделитесь ??

1 Ответ

0 голосов
/ 21 февраля 2019

Как вы знаете, share оператор тот же publish().refCount().Refcount делает connectable observer как вы знаете.Так что ваш код правильный.Но у вас есть недостающая вещь, которая Thread.Я думаю, вы можете понять, что я хочу объяснить по этому поводу.Если нет, дайте мне знать!

Измените код следующим образом

val ob1 = Observable.fromArray(1,2,3,4,5).map {
    println("expensive operation")
    it * 2
}.subscribeOn(Schedulers.computation()).share() 
// Add subscribeOn operator to change emitting thread from MAIN to WORK

fun doMultiplyBy2() {
    ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

    ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}

doMultiplyBy2()

Thread.sleep(1000) // Waiting for ending to execute

output

expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;
...