Как я могу преобразовать io.ktor.utils.io.ByteReadChannel
в kotlinx.coroutines.flow.Flow<java.nio.ByteBuffer>
?
Я использую Ktor с этой маршрутизацией:
post("/upload") {
val channel: ByteReadChannel = call.receiveChannel()
val flow: Flow<ByteBuffer> = channel.asByteBufferFlow() // my custom extension method
transaction.execute {
testDao.saveFile(flow)
}
call.respond("OK")
}
DAO использует R2DB C и Blob примерно так:
override suspend fun saveFile(input: Flow<ByteBuffer>) {
val connection = requireR2DBCTransactionConnection()
val publisher: Publisher<ByteBuffer> = input.asPublisher()
val statement: Statement = connection.createStatement("insert into bindata (data) values ($1)")
statement.bind(0, Blob.from(publisher))
val count: Int = statement.execute().awaitFirst().rowsUpdated.awaitFirst()
if (count != 1) {
throw IllegalStateException()
}
}
Я пытался написать этот метод расширения, но мне не удалось:
fun ByteReadChannel.asByteBufferFlow(): Flow<ByteBuffer> = object : AbstractFlow<ByteBuffer>() {
override suspend fun collectSafely(collector: FlowCollector<ByteBuffer>) {
/* I have no idea */
}
}
Моя основная проблема что я не нашел подобного образца, и оба ByteBuffer
и ByteReadChannel
являются новыми для меня.