Обычно я использую стандартную библиотеку kotlin-jdk8 для перехода из мира API Java *future
в рай suspend
Котлина.
И он прекрасно работал для меня, пока я не столкнулся с API курсора Neo4J, где я не могу сделать .await()
на этапе завершения, потому что он немедленно начинает извлекать миллионы записей в память.
Путь Котлина у меня не работает, вот так:
suspend fun query() {
driver.session().use { session ->
val cursor: StatementResultCursor = session.readTransactionAsync {
it.runAsync("query ...", params)
}.await() // HERE WE DIE WITH OOM
var record = cursor.nextAsync().await()
while (record != null) {
val node = record.get("node")
mySuspendProcessingFunction(node)
record = cursor.nextAsync().await()
}
}
}
В то же время Java API работает хорошо, мы выбираем записи одну за другой:
suspend fun query() {
session.readTransactionAsync { transaction ->
transaction.runAsync("query ...", params).thenCompose { cursor ->
cursor.forEachAsync { record ->
runBlocking { // BUT I NEED TO DO RUN BLOCKING HERE :(
val node = record.get("node")
mySuspendProcessingFunction(node)
}
}
}
}.thenCompose {
session.closeAsync()
}.await()
}
Второй вариант работает для меня, но он довольно уродлив - определенно не так, как Котлин, и, что более важно, мне нужно использовать runBlocking (но весь этот блок выполняется в функции suspend)
Что я делаю не так? Есть ли лучший способ?
UPD
Попытка выполнить это упражнение с использованием новой функции Flow (), к сожалению, результаты те же:
suspend fun query() {
session.readTransactionAsync { transaction ->
transaction.runAsync(query, params).thenApply { cursor ->
cursor.asFlow().onEach { record ->
val node = record.get("node")
mySuspendProcessingFunction(node)
}
}
}.thenCompose {
session.closeAsync()
}.await()
}
fun StatementResultCursor.asFlow() = flow {
do {
val record = nextAsync().await()
if (record != null) emit(record)
} while (record != null)
}