Kotlin: отправка на redezvous Channel не приостанавливается, если никто не читает - PullRequest
0 голосов
/ 20 июня 2020

Я не понимаю, почему эта программа не заканчивается:

object RedezvousExample {
    @JvmStatic
    fun main(args: Array<String>) {
        setOptionToShowCoroutineNames()
        runBlocking {
            val ioChannel = Channel<String>()
            val outputFunction: StringDestination = ChannelDestination(ioChannel, this)
            val producerJob = launch(Dispatchers.Default) { producer(outputFunction) }
            val producedValue = ioChannel.receive()
            logMsg("Received $producedValue")
            producerJob.cancel()
        }
    }
}

fun producer(output: StringDestination) {
    for(i in 1..Int.MAX_VALUE) {
        logMsg("Producing $i")
        output("Iteration $i")
    }
}

class ChannelDestination(val output: SendChannel<String>, val coroutineScope: CoroutineScope) : StringDestination {
    override fun invoke(line: String) {
        coroutineScope.async(Dispatchers.Default) {
            logMsg("Sending $line")
            output.send(line)
        }
    }
}

fun logMsg(msg: Any?) = println("${threadName()}$msg")

В моем (плохом) понимании каналов рандеву я предположил, что вызов output.send будет заблокирован, если никто не будет чтение канала, в то время как производитель, кажется, непрерывно пишет в канал, даже если нет ожидающих методов приема.

Спасибо

Ответы [ 2 ]

0 голосов
/ 21 июня 2020

Ответ @ Anime sh был очень хорош. Я разработал это так:

object RedezvousExample01 {
    @JvmStatic
    fun main(args: Array<String>) {
        setOptionToShowCoroutineNames()
        runBlocking {
            val ioChannel = Channel<String>()
            val outputFunction: StringDestination = ChannelDestination(ioChannel)
            launch { producer(outputFunction) }
            val producedValue = ioChannel.receive()
            ioChannel.close()
            logMsg("Received $producedValue")
        }
    }
}

fun producer(output: StringDestination) {
    try {
        for (i in 1..Int.MAX_VALUE) {
            logMsg("Producing $i")
            output("Iteration $i")
        }
    } catch (e: ClosedSendChannelException) {}
}

class ChannelDestination(val output: SendChannel<String>) : StringDestination {
    override fun invoke(line: String) {
        runBlocking {
            logMsg("Sending $line")
            output.send(line)
        }
    }
}

В любом случае мне пришлось закрыть канал, чтобы завершить сопрограммы, и я догадываюсь, почему метод job.cancel () не сработал.

0 голосов
/ 21 июня 2020

Вы создаете новую сопрограмму каждый раз, когда должно быть отправлено новое значение.

operator fun invoke(line: String) {
    coroutineScope.async(Dispatchers.Default) {
        logMsg("Sending $line")
        output.send(line)
    }
}

И это не приостанавливается, поскольку asyn c просто запускает сборку и забывает (но дает отложенное до await on).

Очевидно, что строка output.send(line) приостанавливается и освобождает поток, добавляя продолжение (сопрограммы внутри блока asyn c) в состояние паузы, и значение не отправляется. Но все же logMsg() вызывается через for l oop, потому что для l oop никогда не приостанавливается.

Чтобы решить эту проблему, ваша функция invoke() должна быть каким-то образом приостановлена ​​до вызова send () возобновляется так, что на l oop будет приостановлено. Вы не можете запускать новую сопрограмму, потому что они все равно не приостанавливаются.

// make producer suspend
suspend fun producer(output: ChannelDestination) {
    for (i in 1..Int.MAX_VALUE) {
        logMsg("Producing $i")
        output("Iteration $i")
    }
}

// make it suspend as well
suspend operator fun invoke(line: String) {
    logMsg("Sending $line")
    output.send(line)
}

Вы запустили функцию producer() в Dispatchers.Default, поэтому все будет работать на ней и будет приостановлено, пока sb не получит это.

...