Существует проблема с базовой семантикой параллельного выполнения для отложенных последовательностей.Ваша текущая реализация не запускается block(it)
, пока результирующая последовательность не будет повторена:
suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
coroutineScope { map { async { block(it) } }.map { it.await() } }
Рассмотрим следующий пример:
sequenceOf(1, 2, 3).parallelMap { it * it }.forEach { println(it) }
Для этого примера порядок выполнения будет
val p1 = async { 1 * 1 }
val r1 = p1.await()
println(r1)
val p2 = async { 2 * 2 }
val r2 = p2.await()
println(r2)
val p3 = async { 3 * 3 }
val r3 = p3.await()
println(r3)
Обратите внимание, что выполнение операций сопоставления является последовательным, а не параллельным.
Что говорит вам компилятор, так это то, что лямбда-выражение Sequence<T>.map {}
выполняется лениво по требованию вне контекставызова (читай: вне вашей сопрограммы), поэтому вы не можете использовать сопрограмму, в которой вы находитесь.
Честно говоря, я не уверен, как можно одновременно выполнять ленивые вычисления и делать это параллельно.