Как реализовать NIO Socket (клиент), используя сопрограммы Kotlin в Java Code? - PullRequest
0 голосов
/ 12 декабря 2018

Я хочу использовать Kotlin (v1.3.0) сопрограммы & java.nio.channels. SocketChannel (NIO), чтобы заменить Socket connect (блокирующий ввод-вывод) в Android.потому что это может сэкономить много потоков.

Приведенный ниже код не может быть запущен, поскольку job.await() приостанавливает функцию в Kotlin, просто вызывается в блоке сопрограмм Ktolin.как launch{..}, async{..}.

// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {

    // Start a new connection
    // Create a non-blocking socket channel
    val socketChannel = SocketChannel.open()
    socketChannel.configureBlocking(false)

    // async calls NIO connect function
    val job = GlobalScope.async(oneThreadCtx) {
        aConnect(socketChannel, address)
    }

    // I what to suspend(NOT block) current Java Thread, until connect is success
    job.await()

    return socketChannel
}

Но я попытался использовать runBlocking{..}, чтобы сделать эту функцию нормальной функцией в Java.но job.await заблокировал текущий поток Java, не приостанавливать .

Итак, как мне реализовать эту функцию с сопрограммами Kotlin (v1.3.0)?

Ответы [ 2 ]

0 голосов
/ 04 января 2019

Как отметил Марко, ваш код все равно будет блокировать поток, даже если эта операция блокировки выполняется в асинхронной сопрограмме.Чтобы по-настоящему получить желаемое асинхронное поведение с Java и Kotlin, вам нужно использовать асинхронную версию Socket Channel

. Таким образом, вы получаете истинную асинхронную обработку сокетов.С помощью этого класса и метода suspendCoroutine Kotlin'а вы можете превратить асинхронные обработчики в приостанавливаемые вызовы.

Вот пример реализации его для чтения:

class TcpSocket(private val socket: AsynchronousSocketChannel) {
    suspend fun read(buffer: ByteBuffer): Int {
        return socket.asyncRead(buffer)
    }

    fun close() {
        socket.close()
    }

    private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
        return suspendCoroutine { continuation ->
           this.read(buffer, continuation, ReadCompletionHandler)
        }
    }

    object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
        override fun completed(result: Int, attachment: Continuation<Int>) {
            attachment.resume(result)
        }

        override fun failed(exc: Throwable, attachment: Continuation<Int>) {
            attachment.resumeWithException(exc)
        }
    }
}

Вы можете удалить обертку, которую я здесь делаю, и просто выставить метод asyncRead в AsynchronousSocketChannel следующим образом:

suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
    return suspendCoroutine { continuation ->
       this.read(buffer, continuation, ReadCompletionHandler)
    }
}

object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
    override fun completed(result: Int, attachment: Continuation<Int>) {
        attachment.resume(result)
    }

    override fun failed(exc: Throwable, attachment: Continuation<Int>) {
        attachment.resumeWithException(exc)
    }
}

Это все дело вкуса и каковы именно ваши цели дизайна.Вы должны иметь возможность реализовать аналогичный метод для начального соединения, как я сделал здесь для чтения.

0 голосов
/ 12 декабря 2018
// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()

Это не реалистичное ожидание.С точки зрения Java, suspend fun приостанавливает свое выполнение, возвращая специальную константу, COROUTINE_SUSPENDED.Вам нужен компилятор Kotlin, чтобы скрыть это от вас и позволить писать регулярно выглядящий код, который должен быть приостановлен.

В вашем коде отсутствует неблокирующая приостановка даже с точки зрения Kotlin, поскольку он использует блокирующий вызов для подключения,Отправка этого вызова в другой поток не делает его неблокирующим.

То, что делает ваш код, полностью эквивалентно передаче задания в службу исполнителя Java и последующему ожиданию его результата.Вы можете, например, использовать CompletableFuture.supplyAsync.

...