Котлин сопрограммы - Как заблокировать, чтобы ждать / присоединиться ко всем местам работы? - PullRequest
0 голосов
/ 16 февраля 2019

Я новичок в Kotlin / Coroutines, так что, надеюсь, я просто что-то упустил / не до конца понимаю, как структурировать свой код для проблемы, которую я пытаюсь решить.

По сути, я беру список строк, и для каждого элемента в списке я хочу отправить его другому методу для выполнения работы (выполнить сетевой вызов и вернуть данные на основе ответа).( Редактировать :) Я хочу, чтобы все вызовы запускались одновременно, и блокировались до тех пор, пока не будут выполнены все вызовы / ответ не обработан, а затем возвращаю новый список с информацией о каждом ответе.

Я, вероятно, еще не до конца понимаю, когда использовать launch / async, но я пытался следовать как при запуске (с joinAll), так и при асинхронности (с await).

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

То, что я ожидаю, произойдет, если мой lstInputs будет иметь размер 120, когда все рабочие места будут объединены, мой lstOfReturnData также должен иметь размер 120.

То, что происходит на самом деле, является неуместнымРезультаты.Я запускаю его один раз, и я получаю 118 в моем окончательном списке, запускаю его снова, это 120, запускаю его снова, это 117 и т. Д. В методе networkCallToGetData() я обрабатываю любые исключения, чтобы хотя бы вернуть что-тодля каждого запроса независимо от того, произошел ли сбой сетевого вызова.

Может ли кто-нибудь помочь объяснить, почему я получаю противоречивые результаты, и что мне нужно сделать, чтобы обеспечить надлежащую блокировку и объединение всех заданий, прежде чем двигаться дальше?

Ответы [ 3 ]

0 голосов
/ 16 февраля 2019

mutableListOf() создает ArrayList, который не является потокобезопасным.
Попробуйте вместо этого использовать ConcurrentLinkedQueue.
Кроме того, используете ли вы стабильную версию Kotlin / Kotlinx.coroutine (не старую экспериментальную)?В стабильной версии, с введением структурированного параллелизма, нет необходимости писать jobs.joinAll anymore.launch - это функция расширения runBlocking, которая запускает новые сопрограммы в области действия runBlocking, а область действия runBlocking автоматически ожидает завершения всех запущенных заданий.Таким образом, приведенный выше код можно сократить до

    val lstOfReturnData = ConcurrentLinkedQueue<response>()
    runBlocking {
            lstInputs.forEach {
                launch(Dispatches.IO) {
                    lstOfReturnData.add(networkCallToGetData(it))
                }
            }
    }
    return lstOfReturnData 
0 голосов
/ 16 февраля 2019

runBlocking блокирует текущий поток до его завершения.Я думаю, это не то, что вы хотите.Если я ошибаюсь и вы хотите заблокировать текущий поток, вы можете избавиться от сопрограммы и просто сделать сетевой вызов в текущем потоке:

val lstOfReturnData = mutableListOf<response>()
lstInputs.forEach {
    lstOfReturnData.add(networkCallToGetData(it))
} 

Но если это не ваше намерение, вы можетевыполните следующие действия:

class Presenter(private val uiContext: CoroutineContext = Dispatchers.Main) 
    : CoroutineScope {

    // creating local scope for coroutines
    private var job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = uiContext + job

    // call this to cancel job when you don't need it anymore
    fun detach() {
        job.cancel()
    }

    fun processData(lstInputs: List<String>) {

        launch {
            val deferredList = lstInputs.map { 
                async(Dispatchers.IO) { networkCallToGetData(it) } // runs in parallel in background thread
            }
            val lstOfReturnData = deferredList.awaitAll() // waiting while all requests are finished without blocking the current thread

            // use lstOfReturnData in Main Thread, e.g. update UI
        }
    }
}
0 голосов
/ 16 февраля 2019

Runblocking должен означать, что вам не нужно вызывать join.Запуск сопрограммы из области блокировки запуска должен сделать это для вас.Вы пробовали только:

fun processData(lstInputs: List<String>): List<response> {

val lstOfReturnData = mutableListOf<response>()

runBlocking {
    lstInputs.forEach {
            launch(Dispatchers.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
   } 
}

return lstofReturnData
...