Как реализовать параллельное отображение для последовательностей в kotlin - PullRequest
2 голосов
/ 24 апреля 2019

Я пытаюсь реализовать параллельную реализацию для Iterable и Sequence в Kotlin. Я получил небольшой файл, он состоит из 4 функций расширения, но третья выдает ошибку компилятора:

suspend fun <T, R> Iterable<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

suspend fun <T> Iterable<T>.parallelForEach(block: suspend (T) -> Unit) =
    coroutineScope { map { async { block(it) } }.forEach { it.await() } }

suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

suspend fun <T> Sequence<T>.parallelForEach(block: suspend (T) -> Unit) =
    coroutineScope { map { async { block(it) } }.forEach { it.await() } }

Компилятор возвращается и говорит, что функции приостановки могут вызываться только внутри функций приостановки. Есть ли способ реализовать это?

Редактировать: исправлено некорректное копирование / вставка

Edit2: я подумал о реализации:

suspend fun <T, R> Sequence<T>.parrallelMap(block: suspend (T) -> R) =
        asIterable().map { coroutineScope { async { block(it) } } }
              .asSequence().map { runBlocking { it.await() } }

Я надеялся, что это запустит все приостановленные функции и будет лениво ждать их. Я просто не уверен, безопасно ли это, или это экономит время или нет.

1 Ответ

2 голосов
/ 24 апреля 2019

Существует проблема с базовой семантикой параллельного выполнения для отложенных последовательностей.Ваша текущая реализация не запускается block(it), пока результирующая последовательность не будет повторена:

suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

Рассмотрим следующий пример:

sequenceOf(1, 2, 3).parallelMap { it * it }.forEach { println(it) }

Для этого примера порядок выполнения будет

val p1 = async { 1 * 1 } 
val r1 = p1.await()
println(r1)
val p2 = async { 2 * 2 } 
val r2 = p2.await()
println(r2)
val p3 = async { 3 * 3 } 
val r3 = p3.await()
println(r3)

Обратите внимание, что выполнение операций сопоставления является последовательным, а не параллельным.

Что говорит вам компилятор, так это то, что лямбда-выражение Sequence<T>.map {} выполняется лениво по требованию вне контекставызова (читай: вне вашей сопрограммы), поэтому вы не можете использовать сопрограмму, в которой вы находитесь.

Честно говоря, я не уверен, как можно одновременно выполнять ленивые вычисления и делать это параллельно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...