Сопрограммы Kotlin и Java Completable будущая интеграция - PullRequest
1 голос
/ 01 июня 2019

Обычно я использую стандартную библиотеку kotlin-jdk8 для перехода из мира API Java *future в рай suspend Котлина.

И он прекрасно работал для меня, пока я не столкнулся с API курсора Neo4J, где я не могу сделать .await() на этапе завершения, потому что он немедленно начинает извлекать миллионы записей в память.

Путь Котлина у меня не работает, вот так:

suspend fun query() {
    driver.session().use { session ->

        val cursor: StatementResultCursor = session.readTransactionAsync {
            it.runAsync("query ...", params)
        }.await() // HERE WE DIE WITH OOM

        var record = cursor.nextAsync().await()

        while (record != null) {
            val node = record.get("node")
            mySuspendProcessingFunction(node)
            record = cursor.nextAsync().await()
        }
    }
}

В то же время Java API работает хорошо, мы выбираем записи одну за другой:

suspend fun query() {
    session.readTransactionAsync { transaction ->
        transaction.runAsync("query ...", params).thenCompose { cursor ->
            cursor.forEachAsync { record ->
                runBlocking { // BUT I NEED TO DO RUN BLOCKING HERE :(
                    val node = record.get("node")
                    mySuspendProcessingFunction(node)
                }
            }
        }
    }.thenCompose {
        session.closeAsync()
    }.await()
}

Второй вариант работает для меня, но он довольно уродлив - определенно не так, как Котлин, и, что более важно, мне нужно использовать runBlocking (но весь этот блок выполняется в функции suspend)

Что я делаю не так? Есть ли лучший способ?

UPD Попытка выполнить это упражнение с использованием новой функции Flow (), к сожалению, результаты те же:

suspend fun query() {
    session.readTransactionAsync { transaction ->
        transaction.runAsync(query, params).thenApply { cursor ->
            cursor.asFlow().onEach { record ->
                val node = record.get("node")
                mySuspendProcessingFunction(node)
            }
        }
    }.thenCompose {
        session.closeAsync()
    }.await()
}
fun StatementResultCursor.asFlow() = flow {
    do {
        val record = nextAsync().await()
        if (record != null) emit(record)
    } while (record != null)
}
...