Как преобразовать ByteReadChannel в поток <ByteBuffer> - PullRequest
0 голосов
/ 28 февраля 2020

Как я могу преобразовать io.ktor.utils.io.ByteReadChannel в kotlinx.coroutines.flow.Flow<java.nio.ByteBuffer>?

Я использую Ktor с этой маршрутизацией:

        post("/upload") {
            val channel: ByteReadChannel = call.receiveChannel()
            val flow: Flow<ByteBuffer> = channel.asByteBufferFlow() // my custom extension method
            transaction.execute {
                testDao.saveFile(flow)
            }
            call.respond("OK")
        }

DAO использует R2DB C и Blob примерно так:

    override suspend fun saveFile(input: Flow<ByteBuffer>) {
        val connection = requireR2DBCTransactionConnection()
        val publisher: Publisher<ByteBuffer> = input.asPublisher()
        val statement: Statement = connection.createStatement("insert into bindata (data) values ($1)")
        statement.bind(0, Blob.from(publisher))
        val count: Int = statement.execute().awaitFirst().rowsUpdated.awaitFirst()
        if (count != 1) {
            throw IllegalStateException()
        }
    }

Я пытался написать этот метод расширения, но мне не удалось:

fun ByteReadChannel.asByteBufferFlow(): Flow<ByteBuffer> = object : AbstractFlow<ByteBuffer>() {

    override suspend fun collectSafely(collector: FlowCollector<ByteBuffer>) {
        /* I have no idea */
    }

}

Моя основная проблема что я не нашел подобного образца, и оба ByteBuffer и ByteReadChannel являются новыми для меня.

...