Kotlin - объем сопрограмм, почему мой асинхронный не выполняется? - PullRequest
0 голосов
/ 01 ноября 2018

Как работает CoroutineScopes?

Допустим, у меня есть

enum class ConceptualPosition{
    INVALID,
    A,B
}

И давайте предположим, что у меня есть пользовательский интерфейс, из которого пользователь может нажать на любую позицию, A или B.

Теперь мне нужен актер, который получает пользовательский ввод, но игнорирует его, пока ввод фактически не будет запрошен. Для простоты, скажем, есть только один способ запросить позицию.

sealed class PositionRequest{
    /**report the next position offered*/
    object ForwardNext:PositionRequest()
}

Итак, мы можем построить что-то вроде этого:

fun CoroutineScope.positionActor(
        offeredPosition:ReceiveChannel<ConceptualPosition>,
        requests:ReceiveChannel<PositionRequest>,
        output:SendChannel<ConceptualPosition>
) = launch{
    var lastReceivedPosition = INVALID
    var forwardNextReceived = 0

    println("ACTOR: entering while loop")
    while(true) {
        select<Unit> {
            requests.onReceive {
                println("ACTOR: requests.onReceive($it)")
                when (it) {
                    is PositionRequest.ForwardNext -> ++forwardNextReceived
                }
            }

            offeredPosition.onReceive {
                println("ACTOR: offeredPosition.onReceive($it)")
                lastReceivedPosition = it
                if (forwardNextReceived > 0) {
                    --forwardNextReceived
                    output.send(it)
                }
            }
        }
    }
}

А затем постройте фасад, чтобы взаимодействовать с ним:

class BasicUI{
    private val dispatcher = Dispatchers.IO

    /*start a Position Actor that receives input from the UI and forwards them on demand*/
    private val requests = Channel<PositionRequest>()
    private val offeredPositions = Channel<ConceptualPosition>()
    private val nextPosition = Channel<ConceptualPosition>()
    init {
        runBlocking(dispatcher){
            positionActor(offeredPositions,requests,nextPosition)
        }
    }

    /** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
    fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
        offeredPositions.send(conceptualPosition)
    }

    /** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
    fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
        requests.send(PositionRequest.ForwardNext)
        nextPosition.receive()
    }
}

Что, конечно, не работает, потому что runBlocking - это CoroutineScope, init не вернется, пока сопрограмма, запущенная positionActor(offeredPositions,requests,nextPosition) не закончится ... что никогда не происходит, потому что в while(true) есть это.

Так что, если мы позволим BasicUI реализовать CoroutineScope? В конце концов, это - это , что, по словам Романа Елизарова, мы должны делать на KotlinConf , и, если я правильно его понял, следует связать сопрограмму, созданную positionActor(...), с экземпляром BasicUI , а не runBlocking -блок.

Посмотрим ...

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.CoroutineContext

class BasicUI:CoroutineScope{

    private val dispatcher = Dispatchers.IO

    private val job = Job()
    override val coroutineContext: CoroutineContext
        get() = job

    /*start a Position Actor that receives input from the UI and forwards them on demand*/
    private val requests = Channel<PositionRequest>()
    private val offeredPositions = Channel<ConceptualPosition>()
    private val nextPosition = Channel<ConceptualPosition>()
    init {
        positionActor(offeredPositions,requests,nextPosition)
    }

    /** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
    fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
        offeredPositions.send(conceptualPosition)
    }

    /** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
    fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
        requests.send(PositionRequest.ForwardNext)
        nextPosition.receive()
    }
}

Давайте создадим небольшой тестовый набор: я предложу актеру несколько A с, которые он должен игнорировать, затем запустим сопрограмму, которая постоянно предлагает B с, один из которых будет возвращен мне, когда я спрашиваю актер на должность.

import ConceptualPosition.*
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking

fun main(args: Array<String>) = runBlocking{
    val ui = BasicUI()
    println("actor engaged")

    //these should all be ignored
    repeat(5){ui.offerPosition(A)}
    println("offered some 'A's")

    //keep offering 'B' so that eventually, one will be offered after we request a position
    async { while(true){ui.offerPosition(B)} }

    //now get a 'B'
    println("requesting a position")
    val pos = ui.getPosition()
    println("received '$pos'")
}

В результате

actor engaged
ACTOR: entering while loop
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
offered some 'A's
ACTOR: offeredPosition.onReceive(A)
requesting a position
ACTOR: requests.onReceive(PositionRequest$ForwardNext@558da0e9)

... и ничего.

По-видимому, B никогда не предлагается - и, следовательно, никогда не пересылается - что приводит к блокировке основного потока (как и должно быть в этой ситуации).

Я бросил

if(conceptualPosition == ConceptualPosition.B) throw RuntimeException("B offered?!")

в BasicUI.offerPosition и не было никаких исключений, поэтому ...

На данный момент, я, вероятно, должен признать, что пока не понимаю, Котлин CoroutineScope.

Почему этот пример не работает?

1 Ответ

0 голосов
/ 02 ноября 2018

Здесь, похоже, есть две проблемы:

  1. offerPosition / getPosition не являются функциями приостановки. Использование runBlocking в большинстве случаев является неправильным решением и должно использоваться при взаимодействии с синхронным кодом или основной функцией.
  2. async без параметров выполняется в текущем CoroutineScope. Для вашей основной функции это runBlocking. Документация фактически описывает поведение:

CoroutineDispatcher по умолчанию для этого построителя во внутренней реализации цикла событий, который обрабатывает продолжения в этом заблокированном потоке до завершения этой сопрограммы. См. CoroutineDispatcher для других реализаций, предоставляемых kotlinx.coroutines.

Говоря простым языком, блок async не получит ход для выполнения в цикле обработки событий, пока другие продолжения его используют. Поскольку getPosition блокирует, вы блокируете цикл обработки событий.

Замена блокирующих функций функциями приостановки и withContext(dispatcher) для отправки другому исполнителю позволила бы запустить асинхронную функцию и в конечном итоге разрешить состояние.

...