Синхронизация и асинхронность завершения цепочки - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть список завершаемых таблиц, по умолчанию я запускаю их один за другим с помощью операторов concat / andThen.Иногда я хочу, чтобы некоторая часть завершаемых таблиц работала параллельно, и после того, как все завершилось, переходите к следующему завершаемому в списке.Я пытался добиться этого с помощью этого кода:

    var completable =
            getAsyncCompletables()?.let {
                it
            } ?: run {
                completables.removeAt(0).getCompletable()
            }
        while (completables.isNotEmpty()) {
            val nextCompletable = getAsyncCompletables()?.let {
                it
            } ?: run {
                completables.removeAt(0).getCompletable()
            }
            completable = nextCompletable.startWith(completable)
        }
        completable
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe()

Я использую этот код для обнаружения асинхронных завершаемых файлов:

 private fun getAsyncCompletables(): Completable? {
    if (completables.size < 2 || !completables[1].async) {
        return null
    }
    var completable = completables.removeAt(0).getCompletable()
    while (completables.isNotEmpty() && completables[0].async) {
        completable = completable.mergeWith(completables.removeAt(0).getCompletable())
    }
    return completable
}

Все работает нормально, за исключением одного, последний завершаемый файл не запущенХотя я использовал "startWith".Я также попытался "concatWith" и "andThen", но тот же результат.

1 Ответ

0 голосов
/ 05 декабря 2018

Сложно ответить, не видя больше вашего кода, в частности, что делает async и какова структура данных для completables.Тем не менее, ответ, который вы ищете, скорее всего, аналогичен независимо от этих значений.Вы, вероятно, захотите использовать Completable.merge(...) или Completable.mergeArray(...).

В соответствии с документацией:

 /**
  * Returns a Completable instance that subscribes to all sources at once and
  * completes only when all source Completables complete or one of them emits an error.
  * ...
  */

Чтобы выполнить параллельное выполнение, вам необходимо вызвать subscribeOnкаждый из ваших Completables в списке / массив / набор с новым потоком.Это можно сделать с помощью Schedulers.newThread() или из общего пула, например Schedulers.io().

Я выполнил тест, чтобы быть уверенным.Вот код.

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    ...

    val completeOne = Completable.fromAction {
        Timber.d("Completable #1 running on ${Thread.currentThread().name}")
    }

    val completeTwo = Completable.fromAction {
        Timber.d("Completable #2 running on ${Thread.currentThread().name}")
    }

    val completeThree = Completable.fromAction {
        Timber.d("Completable #3 running on ${Thread.currentThread().name}")
    }

    val completables = listOf(completeOne, completeTwo, completeThree).map { CompletableWrapper(it) }
    val asyncCompletables = completables
        .asSequence()
        .filter { it.async }
        .map { it.getCompletable().subscribeOn(Schedulers.io()) }
        .toList()

    Completable.merge(asyncCompletables)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
            Timber.i("Completed all completables")
        }, Timber::e)
}

class CompletableWrapper(
    private val completable: Completable,
    val async: Boolean = true
) {
    fun getCompletable() = completable
}

А вот и вывод.

D/MainActivity$onCreate$completeThree: Completable #3 running on RxCachedThreadScheduler-3
D/MainActivity$onCreate$completeTwo: Completable #2 running on RxCachedThreadScheduler-2
D/MainActivity$onCreate$completeOne: Completable #1 running on RxCachedThreadScheduler-1
I/MainActivity$onCreate: Completed all completables

Как видите, он запускает каждый завершаемый файл в новом потоке из пула и только вызовы завершают всепосле завершения каждого завершаемого.

См. здесь документацию по Completable.merge / mergeArray .

...