Синхронный запуск асинхронных задач в Котлине - PullRequest
3 голосов
/ 20 октября 2019

Я пытаюсь запустить «пакетную» работу Firebase Firestore. Поскольку пакетное задание является асинхронным, и каждый пакет обрабатывает только 500 документов, я создал массив пакетных заданий, которые я хочу запускать синхронно, чтобы точно знать, когда выполняется последний пакет, а затем перейти к следующемуоперация.

Однако, чтобы сделать это в Kotlin, когда я прочитал, я натолкнулся на множество жаргонов, таких как runBlocking, Coroutine, Dispatcher, async, await, Context, Suspend, launch, join, Scope,Отложено, Продолжение, CommonPool

Кроме того, во многих постах говорится, что в последней версии Kotlin все изменилось. Kotlin документация говорит о runBlocking, но в этом посте говорится, что runBlocking - плохая вещь.

После некоторых проб и ошибок я получил это для компиляции

suspend fun doTheThing() {

   for ( b in batchArray) {
      coroutineScope {
      val job = async { b.commit() }
      job.await()
  }}
}

Однако теперь я получаю сообщение об ошибке: «Функция приостановки 'doTheThing' должна вызываться только из сопрограммы или другой функции приостановки" В данный момент я просто сбит с толку. Я просто хочу сделать эти вызовы по очереди или подождать, пока все это не будет сделано. Не уверен, что правильный синтаксис, чтобы сделать это, и какие концепции я ошибаюсь.


Обновление : Кажется, работает следующий фрагмент кода:

for ( b in batchArray) {
    runBlocking {b.commit()}
}

Это хорошая практика, чтобы делать это так?

Ответы [ 3 ]

0 голосов
/ 20 октября 2019

Сопрограммы обычно создаются разными строителями в контексте некоторой области сопрограмм. Как и компоновщики, функция приостановки выполняется в области сопрограмм, и поэтому ее следует вызывать в области сопрограмм, которая может быть предоставлена ​​либо путем вызова внутри сопрограммы, функции приостановки, либо явно из определенной области.

ACoroutineScope - это интерфейс, который содержит только одно свойство: coroutineContext . Вы можете просто создать свою собственную область видимости, реализовав интерфейс CoroutineScope и переопределив свой собственный контекст сопрограммы.

val myCoroutineScope = object : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = Job() + Dispatchers.Main
}

Из своей области видимости вы можете использовать компоновщики, такие как запуск, асинхронизация, создание и т. Д.

Вы можете изменить рефакторинг своей функции на

suspend fun doTheThing() = coroutineScope{
for ( b in batchArray) {
    b.commit()
   }
}

fun main(args: Array<String>) {
    myCoroutineScope.launch {
        doTheThing()
        println("Completed")   
      }
}

Я использовал здесь запуск, так как нас не очень интересуют результаты. Функция приостановки будет приостанавливать родительский сопрограмм до тех пор, пока не завершит его выполнение.

Вы также можете выбрать запуск своей области в другом диспетчере

fun main(args: Array<String>) {

myCoroutineScope.launch(Dispatchers.IO) {
    doTheThing()
    println("Completed")
   }
}

Для лучших результатов, когда мы не хотим отменылюбой детской подпрограммы, чтобы отменить область, мы используем работу SuperVisor вместо обычной работы.

val myCoroutineScope = object : CoroutineScope {
override val coroutineContext: CoroutineContext
    get() = SupervisorJob() + Dispatchers.Default
}
0 голосов
/ 23 октября 2019

Пожалуйста, посмотрите решение ниже, где ваша основная функция, с которой вы запускаете пакетную работу, вам необходимо определить, в каком потоке будут обрабатываться все 500 документов. Таким образом, вы инициализируете область действия Coroutine с помощью диспетчера ввода-вывода. И вызываете в нем основной метод обработки.

В Kotlin есть три диспетчера

  • IO для сети иработа, связанная с дисковым вводом-выводом.
  • По умолчанию используется для сложных операций, таких как обход списка или математические операции.
  • Main для размещения результатов в пользовательском интерфейсе или пользовательском интерфейсеопераций.

Теперь, поскольку вы хотите, чтобы все 500 документов обрабатывались параллельно, вы создаете синхронный блок внутри этого фонового потока. Этот синхронный блок не будет завершен, если все операции асинхронных блоков (.commit) не будут завершены.

Полагаю, таким образом вы сможете добиться желаемого поведения. Пожалуйста, смотрите код для того же ниже:

fun initialFunction() {

   //inside this function start the Coroutine using launch
   //using Dispatcher.IO will perform execution of coroutine in background/IO
   CoroutineScope(Dispatchers.IO).launch {

      //call your method which will process batch job asynchronously
      doTheThing()

   }
}

suspend fun doTheThing() {
  //now start your blocking call, this execute following block 
  //synchronously 
  runBlocking {
     for ( b in batchArray) {

     //commit will run synchronously and following nested coroutine
     //will wait for job to get completed 
     launch {
        val job = async { b.commit() }
        job.await()
     }

    }
  }
}
0 голосов
/ 20 октября 2019

Это хорошая практика, чтобы делать это так?

Нет, runBlocking - определенно неправильная вещь. Он заблокирует основной поток вашего приложения и, возможно, завершит его с помощью ANR. Однако конкретный способ написания кода означает, что вы также можете удалить runBlocking и получить точно такое же поведение. b.commit() - это простой асинхронный вызов, он немедленно возвращает объект Task, что означает, что вы не достигли желаемой цели - ожидания завершения пакета перед отправкой следующего.

Теперь, нак правильному решению, которое использует сопрограммы.

Поместите зависимость org.jetbrains.kotlinx:kotlinx-coroutines-play-services в путь к классам. Это даст вам функцию расширения suspend fun Task.await() и позволит вам создать приостановившийся вызов b.commit().await(), который не завершится, пока не будет принят пакет.

Имея это, вы можете написать свою функцию следующим образом. :

fun CoroutineScope.doTheThing(batchArray: List<Batch>) {
    launch {
        for (b in batchArray) {
            b.commit().await()
        }
        // add logic here for what to do when all batches are done
    }
}

Чтобы позвонить, вам понадобится CoroutineScope. Если вы еще не знаете о структурированном параллелизме и способах его использования, взгляните на документ CoroutineScope для быстрого начала.

Обратите внимание, что вызывающий submitAll не будет блокироваться, пока все пакеты не будут выполнены, вместо этого он запустит сопрограмму в фоновом режиме и продолжит работу. Однако запущенная сопрограмма будет приостанавливать во время выполнения пакета, возобновлять его после завершения, начинать следующее задание, приостанавливать и т. Д., Пока все не будет выполнено. Пока он приостановлен, он не будет занимать поток.

...