Как котлин сопрограммы работают внутри? - PullRequest
0 голосов
/ 28 ноября 2018

Как Kotlin реализует сопрограммы внутренне?

Говорят, что сопрограммы являются «более легкой версией» потоков, и я понимаю, что они используют потоки внутри для выполнения сопрограмм.

Что происходит, когдаЯ запускаю сопрограмму, используя любую из функций построителя?

Это мое понимание запуска этого кода:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. У Kotlin есть предопределенный ThreadPool в начале.
  2. В (A) Kotlin начинает выполнение сопрограммы в следующем доступном свободном потоке (скажем, Thread01).
  3. В (B) Kotlin прекращает выполнение текущего потока и запускаетфункция приостановки loadData() в следующей доступной свободной теме (Thread02).
  4. Когда (B) возвращается после выполнения, Kotlin продолжает сопрограмму в следующей доступной свободной теме (Thread03).
  5. (C) выполняется Thread03.
  6. При (D) Thread03 останавливается.
  7. Через 1000 мс выполняется (E)на следующей бесплатной ветке скажите Thread01.

Правильно ли я понимаю?Или сопрограммы реализованы по-другому?

Ответы [ 2 ]

0 голосов
/ 28 ноября 2018

Сопрограммы работают, создавая переключение между возможными точками возобновления:

class MyClass$Coroutine extends CoroutineImpl {
    public Object doResume(Object o, Throwable t) {
        switch(super.state) {
        default:
                throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine");
        case 0:  {
             // code before first suspension
             state = 1; // or something else depending on your branching
             break;
        }
        case 1: {
            ...
        }
        }
        return null;
    }
}

Результирующий код, выполняющий эту сопрограмму, затем создает этот экземпляр и вызывает функцию doResume() каждый раз, когда необходимо возобновить выполнение, как это происходит.выполняется в зависимости от планировщика, используемого для выполнения.

Вот пример компиляции для простой сопрограммы:

launch {
    println("Before")
    delay(1000)
    println("After")
}

, которая компилируется в этот байт-код

private kotlinx.coroutines.experimental.CoroutineScope p$;

public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable);
Code:
   0: invokestatic  #18                 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
   3: astore        5
   5: aload_0
   6: getfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
   9: tableswitch   { // 0 to 1
                 0: 32
                 1: 77
           default: 102
      }
  32: aload_2
  33: dup
  34: ifnull        38
  37: athrow
  38: pop
  39: aload_0
  40: getfield      #24                 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope;
  43: astore_3
  44: ldc           #26                 // String Before
  46: astore        4
  48: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  51: aload         4
  53: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  56: sipush        1000
  59: aload_0
  60: aload_0
  61: iconst_1
  62: putfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
  65: invokestatic  #44                 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
  68: dup
  69: aload         5
  71: if_acmpne     85
  74: aload         5
  76: areturn
  77: aload_2
  78: dup
  79: ifnull        83
  82: athrow
  83: pop
  84: aload_1
  85: pop
  86: ldc           #46                 // String After
  88: astore        4
  90: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  93: aload         4
  95: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  98: getstatic     #52                 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
 101: areturn
 102: new           #54                 // class java/lang/IllegalStateException
 105: dup
 106: ldc           #56                 // String call to \'resume\' before \'invoke\' with coroutine
 108: invokespecial #60                 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
 111: athrow

Я скомпилировал это с помощью kotlinc 1.2.41

С 32 по 76 - код для печати Before и вызова delay(1000), который приостанавливается.

С 77 до 101 - этокод для печати After.

От 102 до 111 - обработка ошибок для недопустимых состояний возобновления, как обозначено меткой default в таблице переключателей.

Итак, как итог,сопрограммы в kotlin - это просто конечные автоматы, которыми управляет какой-то планировщик.

0 голосов
/ 28 ноября 2018

Сопрограммы - это совершенно отдельная вещь от любой политики планирования, которую вы описываете.Сопрограмма - это, по сути, цепочка вызовов suspend fun s.Подвеска полностью под вашим контролем: вам просто нужно позвонить suspendCoroutine.Вы получите объект обратного вызова, чтобы вы могли вызвать его метод resume и вернуться туда, где вы приостановили.

Вот некоторый код, в котором вы можете увидеть, что приостановка - это очень прямой и прозрачный механизм, полностью находящийся под вашим контролем.control:

import kotlin.coroutines.*
import kotlinx.coroutines.*

var continuation: Continuation<String>? = null

fun main(args: Array<String>) {
    val job = GlobalScope.launch(Dispatchers.Unconfined) {
        while (true) {
            println(suspendHere())
        }
    }
    continuation!!.resume("Resumed first time")
    continuation!!.resume("Resumed second time")
}

suspend fun suspendHere() = suspendCancellableCoroutine<String> {
    continuation = it
}

Сопрограмма, которую вы launch приостанавливает при каждом вызове suspendHere().Он записывает обратный вызов продолжения в свойство continuation, а затем вы явно используете это продолжение для возобновления сопрограммы.

Код использует диспетчер сопрограмм Unconfined, который вообще не отправляет потоки, он простозапускает код сопрограммы прямо там, где вы вызываете continuation.resume().


Имея это в виду, давайте вернемся к вашей диаграмме:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. У Котлина естьпредварительно определены ThreadPool в начале.

Может иметь или не иметь пул потоков.Диспетчер пользовательского интерфейса работает с одним потоком.

Обязательным условием для того, чтобы поток был целью диспетчера сопрограмм, является наличие параллельной очереди, связанной с ним, и поток запускает цикл верхнего уровня, который берет Runnable объекты из этой очереди и выполняет их,Диспетчер сопрограмм просто помещает продолжение в эту очередь.

На (A) Kotlin начинает выполнение сопрограммы в следующей доступной бесплатной теме (скажем, Thread01).

Это также может быть та же нить, где вы назвали launch.

При (B) Kotlin прекращает выполнение текущего потока и запускает функцию приостановки loadData() в следующем доступном свободном потоке (Thread02).

Kotlin не имеетнужно остановить любые потоки, чтобы приостановить сопрограмму.На самом деле основной смысл сопрограмм заключается в том, что потоки не запускаются или останавливаются.Цикл верхнего уровня потока продолжится и выберет еще один работающий объект.

Кроме того, сам факт, что вы называете suspend fun, не имеет значения.Сопрограмма приостанавливается только тогда, когда она явно вызывает suspendCoroutine.Функция также может просто возвращаться без приостановки.

Но давайте предположим, что она действительно вызвала suspendCoroutine.В этом случае сопрограмма больше не работает в любом потоке .Он приостановлен и не может продолжаться до тех пор, пока какой-нибудь код не вызовет continuation.resume().Этот код может выполняться в любом потоке в любое время в будущем.

Когда (B) возвращается после выполнения, Kotlin продолжает сопрограмму в следующей доступной свободной теме (Thread03).

B hasn 't «возврат за исполнением», сопрограмма возобновляется, пока она находится внутри своего тела.Он может приостановить и возобновить любое количество раз, прежде чем вернуться.

(C) выполняется Thread03. В (D) Thread03 останавливается. Через 1000 мс (E) выполняется в следующем свободном потоке,скажем Thread01.

И снова ни одна тема не останавливается.Сопрограмма приостанавливается, и для планирования ее возобновления через 1000 мс используется механизм, обычно специфичный для диспетчера.В этот момент он будет добавлен в очередь выполнения, связанную с диспетчером.


Для конкретности давайте рассмотрим несколько примеров того, какой код требуется для отправки сопрограммы.

Диспетчер пользовательского интерфейса Swing:

EventQueue.invokeLater { continuation.resume(value) }

Диспетчер пользовательского интерфейса Android:

mainHandler.post { continuation.resume(value) }

Диспетчер службы ExecutorService:

executor.submit { continuation.resume(value) } 
...