Сложно ответить, не видя больше вашего кода, в частности, что делает async
и какова структура данных для completables
.Тем не менее, ответ, который вы ищете, скорее всего, аналогичен независимо от этих значений.Вы, вероятно, захотите использовать Completable.merge(...)
или Completable.mergeArray(...)
.
В соответствии с документацией:
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* ...
*/
Чтобы выполнить параллельное выполнение, вам необходимо вызвать subscribeOn
каждый из ваших Completables в списке / массив / набор с новым потоком.Это можно сделать с помощью Schedulers.newThread()
или из общего пула, например Schedulers.io()
.
Я выполнил тест, чтобы быть уверенным.Вот код.
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
...
val completeOne = Completable.fromAction {
Timber.d("Completable #1 running on ${Thread.currentThread().name}")
}
val completeTwo = Completable.fromAction {
Timber.d("Completable #2 running on ${Thread.currentThread().name}")
}
val completeThree = Completable.fromAction {
Timber.d("Completable #3 running on ${Thread.currentThread().name}")
}
val completables = listOf(completeOne, completeTwo, completeThree).map { CompletableWrapper(it) }
val asyncCompletables = completables
.asSequence()
.filter { it.async }
.map { it.getCompletable().subscribeOn(Schedulers.io()) }
.toList()
Completable.merge(asyncCompletables)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Timber.i("Completed all completables")
}, Timber::e)
}
class CompletableWrapper(
private val completable: Completable,
val async: Boolean = true
) {
fun getCompletable() = completable
}
А вот и вывод.
D/MainActivity$onCreate$completeThree: Completable #3 running on RxCachedThreadScheduler-3
D/MainActivity$onCreate$completeTwo: Completable #2 running on RxCachedThreadScheduler-2
D/MainActivity$onCreate$completeOne: Completable #1 running on RxCachedThreadScheduler-1
I/MainActivity$onCreate: Completed all completables
Как видите, он запускает каждый завершаемый файл в новом потоке из пула и только вызовы завершают всепосле завершения каждого завершаемого.
См. здесь документацию по Completable.merge / mergeArray .