Koltin Vertx с блоками Coroutines при попытке вызвать блокировку запуска - PullRequest
0 голосов
/ 07 ноября 2018

Я использую стороннюю библиотеку, которая предоставляет функцию обратного вызова. Функция обратного вызова будет вызвана в случае успеха. Функция обратного вызова не является функцией приостановки, но когда я пытаюсь сделать вызов внутри функции без приостановки, чтобы вернуть результат функции приостановки, которая делает вызов ввода-вывода с использованием aysnc и await, вызов никогда не выполняется. Ниже я представил простой фрагмент кода, который демонстрирует проблему.

open class TestVerticle: CoroutineVerticle() {

  override suspend fun start() {

    awaitBlockingExample()

  }

 fun awaitBlockingExample():String {

    val future= async(vertx.dispatcher()) {

        makeSuspendFunCall()
     }
     val result:String= runBlocking(vertx.dispatcher()){future.await()}
     println(" The final Result is $result")
     return result
   }

  suspend fun makeSuspendFunCall():String{
    println("Comming here 3")
    delay(500)
    val resp="Test"
    return resp
  }

}
fun main(args: Array<String>) = runBlocking {
    Vertx.vertx().deployVerticle("TestVerticle")
}

Программа запускает штрафы, если я уберу функцию задержки в makeSuspendFunCall, но она зависнет, если я добавлю функцию задержки. Я фактически симулирую сетевой вызов функции приостановки, используя здесь функцию задержки. Как я могу получить результат от awaitBlockingExample в этом сценарии? Я четко понимаю, что, сделав awaitBlockingExample в качестве функции приостановки, я смогу выполнить эту работу, удалить асинхронные и выполнить блокирующие вызовы внутри. Но здесь awaitBlockingExample (не приостановленная функция) представляет реализацию, предоставленную библиотекой этой стороны, где она переопределена в нашей реализации. Например, кеш guava предоставляет функцию перезагрузки, я хотел бы переопределить функцию перезагрузки (функция без приостановки) и вызвать функцию сопрограммы из метода перезагрузки для обновления значения кэша из базы данных или сетевого вызова.

Ответы [ 3 ]

0 голосов
/ 14 ноября 2018

Попробуйте следующий подход:

override fun start() {
    GlobalScope.launch {
        val result = awaitBlockingExample()
    }
}

suspend fun awaitBlockingExample(): String {
    val response =  makeSuspendFunCall()
    println(" The final Result is $response")
    return response
}

suspend fun makeSuspendFunCall():String{
    println("Comming here 3")
    return suspendCoroutine {
        delay(500)
        val resp="Test"
        it.resume(resp)
    }
}
0 голосов
/ 15 ноября 2018

Для Kotlin 1.3.0 и выше

private val mainScope = CoroutineScope(Dispatchers.Main)

fun start(){
        mainScope.launch { 
            val data = withContext(Dispatchers.IO){
                //This function will return the result. Return type of the below function will be type of data variable above.
                awaitBlockingExample()
            }
            //use your data result from async call. Result will be available here as soon as awaitBlockingExample() return it.
        }
        //Your function will continue execution without waiting for async call to finish.
    }

 fun awaitBlockingExample():String {
    //Your Logic
   }

Надеюсь, это поможет.

0 голосов
/ 07 ноября 2018

Проблема в том, что vertx.dispatcher() использует один поток в качестве цикла обработки событий, а runBlocking блокирует этот поток.

Детали:

Ваша функция awaitBlockingExample() работает в этом потоке цикла событий Vertx, поскольку она запускается из функции suspend start(). Если вы вызываете runBlocking(), этот поток Vertx блокируется и никогда не освобождается. Но ваши другие сопрограммы, например async(), теперь нет потока, чтобы делать свою работу.

Решение:

Я предполагаю, что вызов awaitBlockingExample из функции start происходит только в этом примере. В действительности я бы предположил, что внешний обратный вызов использует свой собственный поток. Тогда нет никаких проблем, потому что теперь внешний поток заблокирован:

override suspend fun start() {

    //simulate own thread for external callback
    thread {
        awaitBlockingExample()
    }
}

fun awaitBlockingExample():String {

    val future= async(vertx.dispatcher()) {

        makeSuspendFunCall()
    }
    val result:String= runBlocking(vertx.dispatcher()){future.await()}
    println(" The final Result is $result")
    return result
}

Кстати: вам не нужен блок async(), вы можете напрямую позвонить на makeSuspendFunCall() с runBlocking()

fun awaitBlockingExample():String = runBlocking(vertx.dispatcher()){
    val result = makeSuspendFunCall()
    println(" The final Result is $result")
    result
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...