Нет, вложенные flatMap
не позволяют параллельному запуску Single
, что подтверждается следующим тестом:
// so we can be sure service1 and service2 are active
val bothSubscribed = CountDownLatch(2)
// so we can simulate a blocking, long running operation on both services
val subscribeThreadsStillRunning = CountDownLatch(1)
val service5 = { str: String, str2: String ->
Observable.just("service5: $str, $str2").singleOrError()
}
val scheduler = Schedulers.io()
val createSingle = { value: String ->
Observable
.create<String> { emitter ->
println("subscribe $value on ${Thread.currentThread().name}")
bothSubscribed.countDown()
subscribeThreadsStillRunning.await(10, SECONDS)
emitter.onNext(value)
}
.singleOrError()
.subscribeOn(scheduler)
}
val s1 = createSingle("outer")
val s4 = createSingle("inner")
s1.flatMap { outer ->
s4.flatMap { inner ->
service5(outer, inner)
}
}.subscribe()
assert(bothSubscribed.await(5, SECONDS))
subscribeThreadsStillRunning.countDown()
Причину можно понять, вспомнив этот код в лямбда-выражениях. не запускается до тех пор, пока не будет выполнена лямбда (кажется очевидным, что я так говорю, но мне потребовалось немного времени, чтобы ее получить). s4.flatMap
- это то, что вызывает подписку на s4
, но этот код не выполняется до тех пор, пока outer
не станет доступным, то есть до тех пор, пока s1
уже не отправит и, следовательно, не завершится.
Zip выглядит как Идеальное решение для этого, и я не уверен, почему вы хотите использовать плоскую карту. Я не могу придумать, как это сделать. Он также имеет API-интерфейс, безопасный для типов, поэтому вам не нужно использовать API на основе массива.
Singles
.zip(s1, s4) { outer, inner -> service5(outer, inner) }
.flatMap { it }
.subscribe()
Обратите внимание, что я использовал Singles
из "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1"
, поскольку лямбды лучше работают с Kotlin.