Параллельное выполнение заданий с использованием сопрограмм - PullRequest
2 голосов
/ 19 сентября 2019

Я хочу выполнять несколько заданий параллельно, используя сопрограммы.Это кусок кода, который я придумал.
У меня есть 2 запроса:

  • Как обеспечить выполнение обратных вызовов завершения в потоке вызывающей стороны?

  • Код стал больше похож на шаблон обратного вызова, который я имел с обычными потоками.Пожалуйста, предложите изменения в дизайне, чтобы добиться удобства чтения сопрограмм.

class ParallelExecutor {

    suspend fun <OUTPUT> execute(
        jobs: List<suspend () -> OUTPUT>,
        onTimeout: (jobIndex: Int) -> OUTPUT,
        onFailure: (jobIndex: Int, exception: Throwable) -> OUTPUT,
        onCompletion: suspend (jobIndex: Int, result: OUTPUT) -> Unit,
        timeout: Long,
        onFullCompletion: suspend () -> Unit = {},
        invokeDispatcher: CoroutineDispatcher = Dispatchers.Default
    ) {
        withContext(invokeDispatcher) {
            var counter = 0
            val listenJobs = mutableListOf<Deferred<OUTPUT>>()

            jobs.forEachIndexed { index, job ->
                val listenJob = async {

                    try {
                        job()
                    } catch (e: Exception) {
                        onFailure(index, e)
                    }
                }
                listenJobs.add(listenJob)
            }

            listenJobs.forEachIndexed { index, job ->
                launch {
                    val output = try {
                        withTimeout(timeout) {
                            job.await()
                        }
                    } catch (e: TimeoutCancellationException) {
                        onTimeout(index)
                    }
                    onCompletion(index, output)
                    if (++counter == listenJobs.size) {
                        onFullCompletion()
                    }
                }
            }
        }
    }
}

Ответы [ 2 ]

3 голосов
/ 19 сентября 2019

Мне кажется, что вы можете немного упростить свой код.Вам не требуется двухэтапная идиома, которая сначала запускает все задания async, а затем запускает больше заданий, которые их ожидают.Вы можете просто launch заданий и делегировать их в один и тот же блок.Таким образом, обратные вызовы будут естественным образом вызываться диспетчером вызывающей стороны, и только измененная контекстная область может быть вызвана только в измененном контексте с помощью invokeDispatcher.

onFullCompletion, который выглядит как фрагмент кода, принадлежащий вызывающей стороне.сторона, ниже execute вызова.Поскольку execute не выдает никаких исключений, вам не нужно try-finally, чтобы получить его.

suspend fun <OUTPUT> execute(
    jobs: List<suspend () -> OUTPUT>,
    onTimeout: (jobIndex: Int) -> OUTPUT,
    onFailure: (jobIndex: Int, exception: Throwable) -> OUTPUT,
    onCompletion: suspend (jobIndex: Int, result: OUTPUT) -> Unit,
    timeout: Long,
    invokeDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
    coroutineScope {
        jobs.mapIndexed { index, job ->
            launch {
                val output = try {
                    withTimeout(timeout) {
                        withContext(invokeDispatcher) {
                            job()
                        }
                    }
                } catch (e: TimeoutCancellationException) {
                    onTimeout(index)
                } catch (e: Exception) {
                    onFailure(index, e)
                }
                onCompletion(index, output)
            }
        }
    }
}
1 голос
/ 19 сентября 2019

Внесены некоторые улучшения, которые должны отвечать на ваши запросы.


class ParallelExecutor {

    suspend fun <OUTPUT> execute(
        jobs: List<suspend () -> OUTPUT>,
        onTimeout: (jobIndex: Int) -> OUTPUT,
        onFailure: (jobIndex: Int, exception: Throwable) -> OUTPUT,
        onCompletion: suspend (jobIndex: Int, result: OUTPUT) -> Unit,
        timeout: Long,
        invokeDispatcher: CoroutineDispatcher = Dispatchers.Default
    ) {
        supervisorScope {
            val listenJobs = jobs.map { job ->
                async(invokeDispatcher) {
                    withTimeout(timeout) {
                        job()
                    }
                }
            }

            listenJobs.forEachIndexed { index, job ->
                launch {
                    val output = try {
                        job.await()
                    } catch (e: TimeoutCancellationException) {
                        onTimeout(index)
                    } catch (e: Exception) {
                        onFailure(index, e)
                    }
                    onCompletion(index, output)
                }
            }
        }
    }
}

  • Задания теперь отменяются при достижении тайм-аута.
  • В вызывающем теперь вызываются обратные вызовы завершенияdispatcher .
  • Исправлено состояние гонки при решении, когда вызывать onFullCompletion.
  • Удалены некоторые методы, которые вам на самом деле не нужны.

Если вы чувствуете, что это больше похоже на шаблон обратного вызова, то вам просто не следует использовать обратные вызовы.Сопрограммы спроектированы таким образом, что вы пишете такой код на сайте использования с минимальным шаблоном, поэтому такие функции не нужны и выглядят странно (ИМХО).

...