Котлин сопрограммы будущего ждут с таймаутом (без отмены) - PullRequest
0 голосов
/ 25 января 2019

Учитывая, что у нас есть CompletableFuture f, в kotlin приостанавливаемой области мы можем назвать f.await(), и мы приостановим, пока это не будет сделано.

У меня возникли проблемы с реализацией аналогичной функции с сигнатурой f.await(t), которая должна приостанавливаться на максимальные t миллисекунды или возвращаться раньше, если будущее завершится в течение этого времени (в зависимости от того, что произойдет раньше).

Вот то, что я попробовал.

/**
 * Suspend current method until future is done or specified duration expires,
 * whichever happens first without cancelling the future.
 * Returns true if its done, false otherwise.
 */
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
   val future = this
   try {
      withTimeout(duration) {
         withContext(NonCancellable) { // this does not help either
            future.await() // i do not expect the future itself to be cancelled
         }
      }
   } catch (t: TimeoutCancellationException) {
      // we expected this
   } catch (e: Throwable) {
      e.printStackTrace()
   }

   return future.isDone

}

fun main(args: Array<String>) = runBlocking<Unit> {
   val future = GlobalScope.future {
      try {
         repeat(5) {
            println("computing")
            delay(500)
         }
         println("complete")
      } finally {
         withContext(NonCancellable) {
            println("cancelling")
            delay(500)
            println("cancelled")
         }
      }
   }

   for (i in 0..10) {
      if (future.await(2000)) {
         println("checking : done")
      } else {
         println("checking : not done")
      }
   }
}

Мне также нужна аналогичная функция для работы.Но, возможно, решение для этого также поможет мне в этом ...

Вывод для этого

computing
computing
computing
computing
checking : done
checking : done
checking : done
checking : done
cancelling
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done

Ответы [ 2 ]

0 голосов
/ 26 января 2019

Вот то, что я придумал, я считаю, что это не очень хорошее решение, так как я, скорее всего, создаю много мусора для довольно примитивной задачи.


suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
   val timeout = CompletableFuture<Unit>()

   GlobalScope.launch {
      delay(duration)
      timeout.complete(Unit)
   }

   val anyOfTwo = CompletableFuture.anyOf(this, timeout)
   anyOfTwo.await()
   return this.isDone
}


fun main() = runBlocking {
   val future = CompletableFuture<String>()

   GlobalScope.launch {
      delay(2000)
      println("setting the result (future now ${future.isDone})")
      future.complete("something")
   }

   while (future.isNotDone()) {
      println("waiting for the future to complete for the next 500ms")
      val isDone = future.await(500)

      if (isDone) {
         println("future is done")
         break
      } else {

         println("future not done")
      }
   }

   Unit
}

это даст вывод

waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
setting the result (future now false)
future is done

что мы и хотели ...

0 голосов
/ 26 января 2019

Я написал некоторый тестовый код:

fun main(args: Array<String>) = runBlocking {
    val future = calculateAsync()
    val result = future.await(2000)
    println("result=$result")
}

suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
    val future = this
    var result: T? = null
    try {
        withTimeout(duration) {
            result = future.await()
        }
    } catch (t: TimeoutCancellationException) {
        println("timeout exception")
    } catch (e: Throwable) {
        e.printStackTrace()
    }

    return result
}

@Throws(InterruptedException::class)
fun calculateAsync(): CompletableFuture<String> {
    val completableFuture = CompletableFuture<String>()

    Executors.newCachedThreadPool().submit {
        Thread.sleep(3000)
        println("after sleep")
        completableFuture.complete("Completed")
    }

    return completableFuture
}

После того, как мы запустим этот код, мы получим вывод:

timeout exception
result=null
after sleep

Мы видим, что наша функция расширения await возвращаетnull потому что мы установили тайм-аут на 2000 миллисекунд, но CompletableFuture завершается через 3000 миллисекунд.В этом случае CompletableFuture отменяется (его свойство isCancelled возвращает true), но функция Thread, которую мы запустили в calculateAsync, продолжает выполняться (мы видим это в журналах after sleep).

Если мы установим длительность тайм-аута в 4000 миллисекунд future.await(4000) в функции main, мы увидим следующий результат:

after sleep
result=Completed

Теперь у нас есть некоторый результат, потому что CompletableFuture выполняется быстрее, чем 4000 миллисекунд.

...