Безопасна ли эта реализация takeWhileInclusive? - PullRequest
0 голосов
/ 16 мая 2018

Я нашел следующую реализацию включительно takeWhile (найдено здесь )

fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
    var shouldContinue = true
    return takeWhile {
        val result = shouldContinue
        shouldContinue = pred(it)
        result
    }
}

Проблема в том, что я не уверен на 100%, что это безопасно, если используется на параллельная последовательность .

Меня беспокоит то, что мы будем полагаться на переменную shouldContinue, чтобы знать, когда остановиться, но мы не синхронизируем ее доступ.

Есть идеи?

1 Ответ

0 голосов
/ 27 мая 2018

Вот что я понял до сих пор.

Пояснение вопроса

Вопрос неясен.Нет такой вещи как параллельная последовательность Я, вероятно, перепутал их с параллельными потоками Java .Я имел в виду последовательность, которая использовалась одновременно .

Последовательности синхронны

Как указал @LouisWasserman в комментариях, последовательности не предназначены для параллельного выполнения.В частности, SequenceBuilder помечен @RestrictSuspension.Ссылаясь на Kotlin Coroutine repo:

Это означает, что никакое расширение лямбды SequenceBuilder в своей области не может вызвать suspendContinuation или другую общую функцию приостановки

Имеясказал, что, как прокомментировал @MarkoTopolnik, они все еще могут использоваться в параллельной программе, как и любой другой объект.

Последовательности, используемые параллельно

В качестве примера вот первая попытка использования последовательностей параллельно

fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
    println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}

fun main(args: Array<String>) {
    val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    runBlocking {
        val iterator = s.iterator()
        repeat(10) { launchProcessor(it, iterator) }
    }
}

Этот код печатает:

[ForkJoinPool.commonPool-worker-2] Процессор # 1 получен 1

[ForkJoinPool.commonPool-worker-1]Процессор № 0 получен 0

[ForkJoinPool.commonPool-worker-3] Процессор № 2 получен 2

[ForkJoinPool.commonPool-worker-2] Процессор № 3 получен 3

[ForkJoinPool.commonPool-worker-1] Процессор № 4 получен 3

[ForkJoinPool.commonPool-worker-3] Процессор № 5 получен 3

[ForkJoinPool.commonPool-worker-1]Процессор № 7 получил 5

[ForkJoinPool.commonPool-worker-2] Процессор № 6 получен 4

[ForkJoinPool.commonPool-worker-1] Процессор № 9 получен 7

[ForkJoinPool.commonPool-работник-3] Процессор № 8 получил 6

Что, конечно, не то, что мы хотим .Поскольку некоторые числа используются дважды.

Введите каналы

С другой стороны, если бы мы использовали каналы, мы могли бы написать что-то вроде этого:

fun produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("[${Thread.currentThread().name}] Processor #$id received $it")
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(1000)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

Затем выводis:

[ForkJoinPool.commonPool-worker-2] Процессор № 0 получен 1

[ForkJoinPool.commonPool-worker-2] Процессор № 0 получен 2

[ForkJoinPool.commonPool-worker-1] Процессор № 1 получен 3

[ForkJoinPool.commonPool-worker-2] Процессор № 2 получен 4

[ForkJoinPool.commonPool-worker-1]Процессор № 3 получен 5

[ForkJoinPool.commonPool-worker-2] Процессор № 4 получен 6

[ForkJoinPool.commonPool-worker-2] Процессор № 0 получен 7

[ForkJoinPool.commonPool-worker-1] Процессор № 1 получен 8

[ForkJoinPool.commonPool-worker-1] Процессор № 2 получен 9

[ForkJoinPool.commonPool-worker-2]Процессор № 3 получил 10

Более того, мы могли бы реализовать метод takeWhileInclusive для таких каналов, как this:

fun <E> ReceiveChannel<E>.takeWhileInclusive(
        context: CoroutineContext = Unconfined,
        predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
    var shouldContinue = true
    consumeEach {
        val currentShouldContinue = shouldContinue
        shouldContinue = predicate(it)
        if (!currentShouldContinue) return@produce
        send(it)
    }
}

И все работает как положено.

...