Почему я не могу выполнить параллельную работу, когда использую delay () или yield () в Kotlin? - PullRequest
0 голосов
/ 04 апреля 2020

Код A, код B и код C получают одинаковый результат Результат Все .

Я думаю, что код B или код C должен получить результат Результат MyThink , потому что я добавил либо delay () , либо yield () .

Похоже, что flow.collect {...} является функцией блока.

Код A

fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(500)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value ->run {                     
                                println(value)  
                              } 
                 }
    println("Done")    
}

Код B

fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(500)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value ->run {                     
                                println(value)  
                                delay(200)
                              } 
                 }
    println("Done")    
}

Код C

fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(500)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value ->run {                     
                                println(value)  
                                yield()
                              } 
                 }
    println("Done")    
}

Результат Все

Calling foo...
Calling collect...
Flow started
1
2
3
Done

Результат MyThink

Calling foo...
Calling collect...
Flow started
1
Done
2
3

1 Ответ

1 голос
/ 04 апреля 2020

Кажется, что flow.collect {...} является блочной функцией.

В буквальном смысле это не так, но здесь действительно есть поведение, которое вы могли бы сформулировать как "blocking".

collect - это функция приостановки, которая будет возвращаться только после того, как соберет все элементы в Flow, для которых она была вызвана. Всякий раз, когда Flow приостанавливается (например, с delay или yield), сбор Flow также приостанавливается. Все это происходит в одной и той же сопрограмме (в данном случае это runBlocking), которая подвешена вместе. Выходные значения Flow и их обработка collect будут продолжены после окончания приостановки. Наконец, когда все собрано, вернется collect, и любой код, который у вас есть после него в той же сопрограмме, будет запущен.

Это согласуется с идеей, что сопрограммы по умолчанию последовательны т.е. все выполняется сверху вниз в вашем коде, по порядку. Если вы хотите одновременное поведение, вы должны явно включить его (например, запустив новые сопрограммы в текущей, с launch или async). Так что то, что вы называете «блокировкой», на самом деле просто последовательное. Функция collect работает , а не , как регистрация слушателя со многими другими API.

Чтобы понять основную идею c, стоящую за Flow, и узнать, как ее собирать в та же самая сопрограмма, я всегда рекомендую этот разговор .

...