Вложенные Single.flatMap в Rx Java против Single.zip, то же самое? - PullRequest
0 голосов
/ 10 февраля 2020

Я сталкиваюсь с путаницей, приводя в качестве примера 4 Single:

val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())

класс данных MyObj (field1: String, field2: Int, field3: Int, field4: String ... ... . field10: Int)

и у меня есть service10.execute(s1 : String s2 : Int s3 : Int s4 : String)

Если я сделаю:

s1.flatMap { str -> 
    s2.flatMap { int1 ->
        s3.flatMap { int2 ->
            s4.flatMap { str2 ->
                ...
                s10.flatmap { int10
                  service10.execute(myObj(str, int1, int2, str2..., int10))
                }
            }
        }
    }
}

Это то же самое, что и:

Single.zip(
            listOf(
                s1,
                s2,
                s3,
                s4
              ...,
              ...,
              s10
            )
        ) { array ->
            val str = array[0] as String
            val int1 = array[1] as Int
            val int2 = array[2] as Int
            val str2 = array[3] as String
            ...
            val str10 = array[9] as Int
        }

1) FlatMap выполняется там параллельно или последовательно там? 2) Если вложенные flatMap являются последовательными, есть ли способ сделать их параллельными как zip?

1 Ответ

2 голосов
/ 11 февраля 2020

Нет, вложенные flatMap не позволяют параллельному запуску Single, что подтверждается следующим тестом:

    // so we can be sure service1 and service2 are active
    val bothSubscribed = CountDownLatch(2)
    // so we can simulate a blocking, long running operation on both services
    val subscribeThreadsStillRunning = CountDownLatch(1)

    val service5 = { str: String, str2: String ->
        Observable.just("service5: $str, $str2").singleOrError()
    }

    val scheduler = Schedulers.io()

    val createSingle = { value: String ->
        Observable
            .create<String> { emitter ->
                println("subscribe $value on ${Thread.currentThread().name}")
                bothSubscribed.countDown()
                subscribeThreadsStillRunning.await(10, SECONDS)
                emitter.onNext(value)
            }
            .singleOrError()
            .subscribeOn(scheduler)
    }

    val s1 = createSingle("outer")
    val s4 = createSingle("inner")

    s1.flatMap { outer ->
        s4.flatMap { inner ->
            service5(outer, inner)
        }
    }.subscribe()

    assert(bothSubscribed.await(5, SECONDS))
    subscribeThreadsStillRunning.countDown()

Причину можно понять, вспомнив этот код в лямбда-выражениях. не запускается до тех пор, пока не будет выполнена лямбда (кажется очевидным, что я так говорю, но мне потребовалось немного времени, чтобы ее получить). s4.flatMap - это то, что вызывает подписку на s4, но этот код не выполняется до тех пор, пока outer не станет доступным, то есть до тех пор, пока s1 уже не отправит и, следовательно, не завершится.

Zip выглядит как Идеальное решение для этого, и я не уверен, почему вы хотите использовать плоскую карту. Я не могу придумать, как это сделать. Он также имеет API-интерфейс, безопасный для типов, поэтому вам не нужно использовать API на основе массива.

Singles
        .zip(s1, s4) { outer, inner -> service5(outer, inner) }
        .flatMap { it }
        .subscribe()

Обратите внимание, что я использовал Singles из "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1", поскольку лямбды лучше работают с Kotlin.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...