В Kotlin Native, как сохранить объект в отдельном потоке и изменить его состояние из любого другого объекта без использования указателей C? - PullRequest
1 голос
/ 22 января 2020

Я изучаю Kotlin Собственный и у меня есть программа с кучей Рабочих , выполняющих параллельные операции (работает на Windows, но это общий вопрос).

Теперь я хотел добавить простую регистрацию. Компонент, который просто регистрирует строки, добавляя их как новые строки в файл, который остается открытым в режиме 'добавления'.

(В идеале, я бы просто имел "глобальную" функцию ...

fun log(text:String) {...} ] 

... что я смогу позвонить откуда угодно, в том числе "изнутри" другим работникам, и это сработало бы. Смысл здесь в том, что сделать это не тривиально, потому что из Kotlin Нативных правил, касающихся передачи объектов между потоками (TLDR: вы не должны передавать изменяемые объекты вокруг. См .: https://github.com/JetBrains/kotlin-native/blob/master/CONCURRENCY.md#object -transfer-and-freezing ). Кроме того, моя функция журнала в идеале приняла бы любой замороженный объект.)


Я пришел к выводу о решениях, использующих DetachedObjectGraph :

Сначала я создаю отсоединенный объект logger

val loggerGraph = DetachedObjectGraph { FileLogger("/foo/mylogfile.txt")}

, а затем используйте loggerGraph.asCPointer() ( asCPointer () ), чтобы получить COpaquePointer для отдельного графика:

val myPointer = loggerGraph.asCPointer()

Теперь я может передать этот указатель рабочим (через производителя лямбда рабочего выполнить функцию ) и использовать ее там. Или я могу сохранить указатель в глобальной переменной @ ThreadLocal .


Для кода, который записывает в файл, всякий раз, когда я хочу записать строку, мне нужно создать DetachedObjectGraph объект из указателя снова и attach() его, чтобы получить ссылку на мой объект fileLogger:

val fileLogger = DetachedObjectGraph(myPointer).attach()

Теперь я могу вызвать функцию регистрации в регистраторе:

fileLogger.log("My log message")

Это то, что я придумал, глядя на API, которые доступны (по состоянию на Kotlin 1.3.61) для параллелизма в Kotlin Native, но Мне интересно, что лучше подход будет (используя Kotlin, не прибегая к C). Ясно, что создавать объект DetachedObjectGraph для каждой записанной строки плохо.


Можно задать этот вопрос более общим образом: как сохранить изменяемый ресурс открытым в отдельном потоке (или работнике ) и отправлять ему сообщения.

Дополнительный комментарий: Наличие сопрограмм , которые действительно используют потоки, решило бы эту проблему, но вопрос заключается в том, как решить эту задачу с помощью API в настоящее время (Kotlin 1.3.61).

Ответы [ 2 ]

2 голосов
/ 22 января 2020

Определенно не следует использовать DetachedObjectGraph так, как указано в вопросе. Ничто не мешает вам пытаться присоединиться к нескольким потокам, или если вы передаете один и тот же указатель, пытаясь присоединиться к недопустимому потоку один за другим, как к нему присоединен.

Как упомянул Domini c, вы можно хранить DetachedObjectGraph в AtomicReference. Однако, если вы собираетесь сохранить DetachedObjectGraph в AtomicReference, убедитесь, что типом является AtomicRef<DetachedObjectGraph?> и busy-l oop, в то время как DetachedObjectGraph равно нулю. Это предотвратит использование одного и того же DetachedObjectGraph несколькими потоками. Убедитесь, что для него установлено значение null, и заполняйте его атомом c.

Однако, нужно ли вообще FileLogger быть изменчивым? Если вы пишете в файл, это не так. Даже если бы это было так, я бы изолировал изменяемый объект отдельному работнику и отправлял ему сообщения журнала, а не делал бы DetachedObjectGraph внутри AtomicRef.

По моему опыту, DetachedObjectGraph супер редкость в производстве код. В настоящее время мы нигде не используем его.

Чтобы изолировать изменяемое состояние до Worker, что-то вроде этого:


class MutableThing<T:Any>(private val worker:Worker = Worker.start(), producer:()->T){
    private val arStable = AtomicReference<StableRef<T>?>(null)
    init {
        worker.execute(TransferMode.SAFE, {Pair(arStable, producer).freeze()}){
            it.first.value = StableRef.create(it.second()).freeze()
        }
    }
    fun <R> access(block:(T)->R):R{
        return worker.execute(TransferMode.SAFE, {Pair(arStable, block).freeze()}){
            it.second(it.first.value!!.get())
        }.result
    }
}

object Log{
    private val fileLogger = MutableThing { FileLogger() }

    fun log(s:String){
        fileLogger.access { fl -> fl.log(s) }
    }
}

class FileLogger{
    fun log(s:String){}
}

MutableThing использует StableRef для внутреннего использования. producer делает изменяемое состояние, которое вы хотите изолировать. Чтобы что-то записать, позвоните по номеру Log.log, и вызовет мутабельный FileLogger.

Чтобы увидеть базовый c пример MutableThing, запустите следующий тест:

@Test
fun goIso(){
    val mt = MutableThing { mutableListOf("a", "b")}
    val workers = Array(4){Worker.start()}
    val futures = mutableListOf<Future<*>>()
    repeat(1000) { rcount ->
        val future = workers[rcount % workers.size].execute(
            TransferMode.SAFE,
            { Pair(mt, rcount).freeze() }
        ) { pair ->
            pair.first.access {
                val element = "ttt ${pair.second}"
                println(element)
                it.add(element)
            }
        }
        futures.add(future)
    }

    futures.forEach { it.result }

    workers.forEach { it.requestTermination() }

    mt.access {
        println("size: ${it.size}")
    }
}
1 голос
/ 22 января 2020

Подход, который вы выбрали, в значительной степени верен, и способ, которым это должно быть сделано.

Я бы добавил, вместо того, чтобы обвести указатель вокруг. Вы должны обойти замороженный FileLogger, который будет внутренне содержать ссылку на AtomicRef<DetachedObjectGraph>, прикрепление и отсоединение должны быть сделаны изнутри. Тем более что DetachedObjectGraph s недействительны после прикрепления.

...