Как использовать актеров, сохраняя способность делать структурированный параллелизм в Kotlin? - PullRequest
1 голос
/ 19 января 2020

У меня есть класс, который использует actor для обеспечения безопасности потоков общего изменяемого состояния. Я сделал небольшую обертку вокруг этого actor, чтобы упростить его использование:

interface Ref<T : Any> {

    fun get(): T

    fun transform(transformer: (T) -> T): Job

}

Здесь get использует runBlocking для блокировки, пока не получит фактическое значение T:

override fun get(): T = runBlocking {
    val deferred = CompletableDeferred<T>()
    launch {
        actor.send(RefOperation.Get(deferred))
    }
    deferred.await()
}

и transform делают нечто подобное без runBlocking и просто возвращают Job:

override fun transform(transformer: (T) -> T): Job {
    val job = Job()
    launch {
        actor.send(RefOperation.Transform(transformer, job))
    }
    return job
}

Это нормально, пока вызов transform не приведет к другому:

ref.transform {

  ...
  ref.transform {

  }
}

Здесь у меня есть 2 Job с, но нет способа объединить их в один Job, на который я могу позвонить join(), если я хочу дождаться их завершения.

Решением для этого будет структурированный параллелизм, но тогда я больше не знаю, как создать мой actor, поскольку он определен как расширение для CoroutineScope.

Как я могу продолжать использовать actor при сохранении возможности использовать структурированный параллелизм?

Обратите внимание, что я создал Ref, потому что мой проект является мультиплатформенным, и для целей, отличных от JVM, я использую альтернативные реализации.

1 Ответ

3 голосов
/ 19 января 2020

actor обрабатывает элементы в том же порядке, в котором они были добавлены, и последовательно в одной сопрограмме. Это означает, что внутреннее transform будет обработано ПОСЛЕ завершения внешнего transform, и вы не можете изменить его, пока используете actoractor мы не можем запустить больше сопрограмм, потому что мы ограничиваем наше состояние в один поток, в противном случае возможен повторяющийся порядок обработки). Попытка присоединиться к работе внутреннего transform в теле внешнего transform (если мы отметим transform как приостановленную функцию) просто вызовет взаимоблокировку.

С таким поведением все в порядке? Если нет, не используйте ни актеров, ни вложенные преобразования. Если да, приведите примеры использования, в которых создание вложенного transform, который будет обрабатываться после внешнего transform, имеет смысл.

Что касается объединения всех заданий, у меня есть некоторый код. В main у нас есть внешнее преобразование, которое создает внутреннее преобразование. Внешний возвращает 2, внутренний возвращает 8, но внутренний начинается после завершения внешнего, поэтому результат равен 8. Но, как вы и хотели, transformJob.join() в main также ожидает внутреннюю работу.

private sealed class RefOperation<T>
private class Get<T : Any>(val deferred: CompletableDeferred<T>) : RefOperation<T>()
private class Transform<T : Any>(val transformer: TransformStub<T>.(T) -> T, val stub: TransformStub<T>, val job: CompletableJob) : RefOperation<T>()

interface Ref<T : Any> {

    fun get(): T

    fun transform(transformer: TransformStub<T>.(T) -> T): Job

}

interface TransformStub<T : Any> {
    fun transform(transformer: TransformStub<T>.(T) -> T): Job
}

private class TransformStubImpl<T : Any>(
        val actor: SendChannel<RefOperation<T>>,
        val scope: CoroutineScope
) : TransformStub<T> {

    override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
        return scope.launch {
            val childJob: CompletableJob = Job()
            val childStub = TransformStubImpl(actor, this)
            actor.send(Transform(transformer, childStub, childJob))
            childJob.join()
        }
    }

}

class RefImpl<T : Any>(initialValue: T) : Ref<T> {

    private val actorJob = Job()
    private val actorScope = CoroutineScope(actorJob)
    private val actor = actorScope.actor<RefOperation<T>> {
        var value: T = initialValue
        for (msg in channel) {
            when (msg) {
                is Get -> {
                    println("Get! $value")
                    msg.deferred.complete(value)
                }
                is Transform -> {
                    with(msg) {
                        val newValue = stub.transformer(value)
                        println("Transform! $value -> $newValue")
                        value = newValue
                        job.complete()
                    }
                }
            }
        }
    }

    override fun get(): T = runBlocking {
        val deferred = CompletableDeferred<T>()
        actor.send(Get(deferred))
        deferred.await()
    }

    override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
        val stub = TransformStubImpl(actor, GlobalScope)
        return stub.transform(transformer)
    }

}

fun main() = runBlocking<Unit> {
    val ref: Ref<Int> = RefImpl(0)
    val transformJob = ref.transform {
        transform { 8 }
        2
    }
    transformJob.join()
    ref.get()
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...