Вот что я понял до сих пор.
Пояснение вопроса
Вопрос неясен.Нет такой вещи как параллельная последовательность Я, вероятно, перепутал их с параллельными потоками Java .Я имел в виду последовательность, которая использовалась одновременно .
Последовательности синхронны
Как указал @LouisWasserman в комментариях, последовательности не предназначены для параллельного выполнения.В частности, SequenceBuilder
помечен @RestrictSuspension
.Ссылаясь на Kotlin Coroutine repo:
Это означает, что никакое расширение лямбды SequenceBuilder в своей области не может вызвать suspendContinuation или другую общую функцию приостановки
Имеясказал, что, как прокомментировал @MarkoTopolnik, они все еще могут использоваться в параллельной программе, как и любой другой объект.
Последовательности, используемые параллельно
В качестве примера вот первая попытка использования последовательностей параллельно
fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}
fun main(args: Array<String>) {
val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
runBlocking {
val iterator = s.iterator()
repeat(10) { launchProcessor(it, iterator) }
}
}
Этот код печатает:
[ForkJoinPool.commonPool-worker-2] Процессор # 1 получен 1
[ForkJoinPool.commonPool-worker-1]Процессор № 0 получен 0
[ForkJoinPool.commonPool-worker-3] Процессор № 2 получен 2
[ForkJoinPool.commonPool-worker-2] Процессор № 3 получен 3
[ForkJoinPool.commonPool-worker-1] Процессор № 4 получен 3
[ForkJoinPool.commonPool-worker-3] Процессор № 5 получен 3
[ForkJoinPool.commonPool-worker-1]Процессор № 7 получил 5
[ForkJoinPool.commonPool-worker-2] Процессор № 6 получен 4
[ForkJoinPool.commonPool-worker-1] Процессор № 9 получен 7
[ForkJoinPool.commonPool-работник-3] Процессор № 8 получил 6
Что, конечно, не то, что мы хотим .Поскольку некоторые числа используются дважды.
Введите каналы
С другой стороны, если бы мы использовали каналы, мы могли бы написать что-то вроде этого:
fun produceNumbers() = produce {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("[${Thread.currentThread().name}] Processor #$id received $it")
}
}
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(1000)
producer.cancel() // cancel producer coroutine and thus kill them all
}
Затем выводis:
[ForkJoinPool.commonPool-worker-2] Процессор № 0 получен 1
[ForkJoinPool.commonPool-worker-2] Процессор № 0 получен 2
[ForkJoinPool.commonPool-worker-1] Процессор № 1 получен 3
[ForkJoinPool.commonPool-worker-2] Процессор № 2 получен 4
[ForkJoinPool.commonPool-worker-1]Процессор № 3 получен 5
[ForkJoinPool.commonPool-worker-2] Процессор № 4 получен 6
[ForkJoinPool.commonPool-worker-2] Процессор № 0 получен 7
[ForkJoinPool.commonPool-worker-1] Процессор № 1 получен 8
[ForkJoinPool.commonPool-worker-1] Процессор № 2 получен 9
[ForkJoinPool.commonPool-worker-2]Процессор № 3 получил 10
Более того, мы могли бы реализовать метод takeWhileInclusive
для таких каналов, как this:
fun <E> ReceiveChannel<E>.takeWhileInclusive(
context: CoroutineContext = Unconfined,
predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
var shouldContinue = true
consumeEach {
val currentShouldContinue = shouldContinue
shouldContinue = predicate(it)
if (!currentShouldContinue) return@produce
send(it)
}
}
И все работает как положено.