Счетчик прогресса Kotlin сопрограмм - PullRequest
0 голосов
/ 29 июня 2018

Я делаю тысячи HTTP-запросов с использованием async / await и хотел бы иметь индикатор прогресса. Я добавил один наивным способом, но заметил, что значение счетчика никогда не достигает суммы, когда все запросы выполнены. Итак, я создал простой тест и, конечно же, он не работает должным образом:

fun main(args: Array<String>) {
    var i = 0
    val range = (1..100000)
    range.map {
        launch {
            ++i
        }
    }
    println("$i ${range.count()}")
}

Вывод выглядит примерно так: первое число всегда меняется:

98800 100000

Возможно, мне не хватает некоторых важных деталей о параллелизме / синхронизации в JVM / Kotlin, но я не знаю, с чего начать. Любые советы?

ОБНОВЛЕНИЕ: я закончил использовать каналы, как предложил Марко:

/**
 * Asynchronously fetches stats for all symbols and sends a total number of requests
 * to the `counter` channel each time a request completes. For example:
 *
 *     val counterActor = actor<Int>(UI) {
 *         var counter = 0
 *         for (total in channel) {
 *             progressLabel.text = "${++counter} / $total"
 *         }
 *     }
 */
suspend fun getAssetStatsWithProgress(counter: SendChannel<Int>): Map<String, AssetStats> {
    val symbolMap = getSymbols()?.let { it.map { it.symbol to it }.toMap() } ?: emptyMap()
    val total = symbolMap.size
    return symbolMap.map { async { getAssetStats(it.key) } }
        .mapNotNull { it.await().also { counter.send(total) } }
        .map { it.symbol to it }
        .toMap()
}

Ответы [ 3 ]

0 голосов
/ 29 июня 2018

Вы читали Основы сопрограмм ? Там точно такая же проблема, как у вас:

val c = AtomicInteger()

for (i in 1..1_000_000)
    launch {
        c.addAndGet(i)
    }

println(c.get())

Этот пример завершается для меня менее чем за секунду, но он печатает произвольное число, потому что некоторые сопрограммы не заканчивают работу до того, как main () напечатает результат.

Поскольку launch не блокирует, нет гарантии, что все сопрограммы завершат работу до println. Вам нужно использовать async, хранить Deferred объекты и await для их завершения.

0 голосов
/ 29 июня 2018

Объяснение того, что именно делает ваш неправильный подход неудачным, вторично: главное - это исправить подход.

Вместо async-await или launch для этого шаблона связи у вас вместо этого должен быть actor, которому все задания HTTP отправляют свой статус. Это автоматически решит все ваши проблемы с параллелизмом.

Вот пример кода, взятый по ссылке, указанной вами в комментарии, и адаптированный к вашему варианту использования. Вместо того, чтобы какая-то третья сторона запрашивала у него значение счетчика и обновляла графический интерфейс с ним, субъект запускается в контексте пользовательского интерфейса и обновляет сам графический интерфейс:

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.system.*
import kotlin.coroutines.experimental.*

object IncCounter

fun counterActor() = actor<IncCounter>(UI) {
    var counter = 0
    for (msg in channel) {
        updateView(++counter)
    }
}

fun main(args: Array<String>) = runBlocking {
    val counter = counterActor()
    massiveRun(CommonPool) {
        counter.send(IncCounter)
    }
    counter.close()
    println("View state: $viewState")
}


// Everything below is mock code that supports the example
// code above:

val UI = newSingleThreadContext("UI")

fun updateView(newVal: Int) {
    viewState = newVal
}

var viewState = 0

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
    val numCoroutines = 1000
    val repeatActionCount = 1000
    val time = measureTimeMillis {
        val jobs = List(numCoroutines) {
            launch(context) {
                repeat(repeatActionCount) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${numCoroutines * repeatActionCount} actions in $time ms")
}

Запуск печати

Completed 1000000 actions in 2189 ms
View state: 1000000
0 голосов
/ 29 июня 2018

Вы теряете записи, потому что i++ не является атомарной операцией - значение должно быть прочитано, увеличено, а затем записано обратно - и у вас есть несколько потоков, читающих и пишущих i одновременно. (Если вы не предоставляете launch контексту, он по умолчанию использует пул потоков.)

Вы теряете 1 из своего счетчика каждый раз, когда два потока читают одно и то же значение, поскольку они оба будут записывать это значение плюс один.

Синхронизация каким-либо образом, например, с помощью AtomicInteger решает это:

fun main(args: Array<String>) {
    val i = AtomicInteger(0)
    val range = (1..100000)
    range.map {
        launch {
            i.incrementAndGet()
        }
    }
    println("$i ${range.count()}") // 100000 100000
}

Также нет гарантии, что эти фоновые потоки будут выполнены с их работой к тому времени, когда вы напечатаете результат и ваша программа завершится - вы можете легко проверить это, добавив очень маленькую задержку в launch, пару миллисекунд. При этом, это хорошая идея, чтобы обернуть все это в вызов runBlocking, который поддержит основной поток, а затем дождаться завершения сопрограмм:

fun main(args: Array<String>) = runBlocking {
    val i = AtomicInteger(0)
    val range = (1..100000)
    val jobs: List<Job> = range.map {
        launch {
            i.incrementAndGet()
        }
    }
    jobs.forEach { it.join() }
    println("$i ${range.count()}") // 100000 100000
}
...