Запустить несколько сопрограмм и присоединиться к ним с таймаутом (без отмены) - PullRequest
0 голосов
/ 10 марта 2019

Мне нужно запустить несколько заданий, которые будут возвращать результат.

В основном коде (который не сопрограмма), после запуска заданий мне нужно дождатьсявсе они для выполнения своей задачи ИЛИ в течение заданного времени ожидания, в зависимости от того, что наступит раньше.

Если я выйду из ожидания, потому что все задания завершены до истечения времени ожиданияЭто здорово, я соберу их результаты.

Но если для некоторых заданий требуется больше времени, чем тайм-аут, моя основная функция должна активизироваться, как только истечет тайм-аут, проверить, какие задания закончились вовремя (если таковые имеются) и какие из них все еще работают, и работают оттуда, без отмены работ, которые все еще выполняются.

Как бы вы кодировали этот вид ожидания?

Ответы [ 3 ]

0 голосов
/ 11 марта 2019

Вот решение, которое я придумал.Сопряжение каждой работы с состоянием (среди прочего):

private enum class State { WAIT, DONE, ... }

private data class MyJob(
    val job: Deferred<...>,
    var state: State = State.WAIT,
    ...
)

и запись явного цикла:

// wait until either all jobs complete, or a timeout is reached
val waitJob = launch { delay(TIMEOUT_MS) }
while (waitJob.isActive && myJobs.any { it.state == State.WAIT }) {
    select<Unit> {
        waitJob.onJoin {}
        myJobs.filter { it.state == State.WAIT }.forEach { 
            it.job.onJoin {}
        }
    }
    // mark any finished jobs as DONE to exclude them from the next loop
    myJobs.filter { !it.job.isActive }.forEach { 
        it.state = State.DONE
    }
}

Начальное состояние называется WAIT (вместо RUN), потому что оно нене обязательно означает, что задание все еще выполняется, только то, что мой цикл еще не принял его во внимание.

Мне интересно знать, достаточно ли это идиоматично, или есть более эффективные способы кодирования этоготакое поведение.

0 голосов
/ 11 марта 2019

Решение вытекает прямо из вопроса. Сначала мы разработаем функцию приостановки для этой задачи. Давайте посмотрим наши требования:

если некоторые задания занимают больше времени, чем время ожидания ... без отмены работ, которые все еще выполняются.

Это означает, что запускаемые нами задания должны быть автономными (не дочерними), поэтому мы будем отказываться от структурированного параллелизма и будем использовать GlobalScope для их запуска, собирая вручную все задания. Мы используем async сопрограмма строитель, потому что мы планируем собрать их результаты типа R позже:

val jobs: List<Deferred<R>> = List(numberOfJobs) { 
    GlobalScope.async { /* our code that produces R */ }
}

после запуска заданий мне нужно подождать, пока они все завершат свою задачу, ИЛИ истечь заданный тайм-аут, в зависимости от того, что наступит раньше.

Давайте подождем их всех и сделаем это с таймаутом:

withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }

Мы используем joinAll (в отличие от awaitAll), чтобы избежать исключения в случае сбоя одного из заданий, и withTimeoutOrNull, чтобы избежать исключения по таймауту.

моя основная функция должна активизироваться по истечении времени ожидания, проверить, какие задания закончились вовремя (если есть), а какие еще работают

jobs.map { deferred -> /* ... inspect results */ }

В основном коде (который не является сопрограммой) ...

Поскольку наш основной код не является сопрограммой, он должен ждать блокирующим образом, поэтому мы соединяем код, который мы написали, используя runBlocking. Собираем все вместе:

fun awaitResultsWithTimeoutBlocking(
    timeoutMillis: Long,
    numberOfJobs: Int
) = runBlocking {
    val jobs: List<Deferred<R>> = List(numberOfJobs) { 
        GlobalScope.async { /* our code that produces R */ }
    }    
    withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }
    jobs.map { deferred -> /* ... inspect results */ }
}

P.S. Я бы не рекомендовал развертывать такого рода решения в любой серьезной производственной среде, так как запуск фоновых заданий (утечка) после истечения времени ожидания будет неизменно сильно кусать вас в дальнейшем. Делайте это, только если вы полностью понимаете все недостатки и риски такого подхода.

0 голосов
/ 11 марта 2019

Вы можете попробовать работать с whileSelect и предложением onTimeout.Но вам все еще нужно преодолеть проблему, заключающуюся в том, что ваш основной код не является сопрограммой.Следующие строки являются примером оператора whileSelect.Функция возвращает Deferred со списком результатов, оцененных за период ожидания, и еще один список Deferred с незавершенных результатов.

fun CoroutineScope.runWithTimeout(timeoutMs: Int): Deferred<Pair<List<Int>, List<Deferred<Int>>>> = async {

    val deferredList = (1..100).mapTo(mutableListOf()) {
        async {
            val random = Random.nextInt(0, 100)
            delay(random.toLong())
            random
        }
    }

    val finished = mutableListOf<Int>()
    val endTime = System.currentTimeMillis() + timeoutMs

    whileSelect {
        var waitTime = endTime - System.currentTimeMillis()
        onTimeout(waitTime) {
            false
        }
        deferredList.toList().forEach { deferred ->
            deferred.onAwait { random ->
                deferredList.remove(deferred)
                finished.add(random)
                true
            }
        }
    }

    finished.toList() to deferredList.toList()
}

В вашем основном коде вы можете использовать не рекомендуемый метод runBlocking для доступа к Deferrred.

fun main() = runBlocking<Unit> {
    val deferredResult = runWithTimeout(75)
    val (finished, pending) = deferredResult.await()
    println("Finished: ${finished.size} vs Pending: ${pending.size}")
}
...