Вы создаете новую сопрограмму каждый раз, когда должно быть отправлено новое значение.
operator fun invoke(line: String) {
coroutineScope.async(Dispatchers.Default) {
logMsg("Sending $line")
output.send(line)
}
}
И это не приостанавливается, поскольку asyn c просто запускает сборку и забывает (но дает отложенное до await on).
Очевидно, что строка output.send(line)
приостанавливается и освобождает поток, добавляя продолжение (сопрограммы внутри блока asyn c) в состояние паузы, и значение не отправляется. Но все же logMsg()
вызывается через for l oop, потому что для l oop никогда не приостанавливается.
Чтобы решить эту проблему, ваша функция invoke()
должна быть каким-то образом приостановлена до вызова send () возобновляется так, что на l oop будет приостановлено. Вы не можете запускать новую сопрограмму, потому что они все равно не приостанавливаются.
// make producer suspend
suspend fun producer(output: ChannelDestination) {
for (i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
}
// make it suspend as well
suspend operator fun invoke(line: String) {
logMsg("Sending $line")
output.send(line)
}
Вы запустили функцию producer()
в Dispatchers.Default
, поэтому все будет работать на ней и будет приостановлено, пока sb не получит это.