Закройте канал Coroutine, когда работа потребителя отменяется - PullRequest
0 голосов
/ 21 ноября 2018

У меня есть простой производитель и потребитель, которые используют каналы Coroutine.Вот упрощенная версия:

class Producer {

  suspend fun start(): ReceiveChannel<String> {

    val channel = Channel<String>(Channel.UNLIMITED)

    // Asynchronous channel.send(it) from an object callback

    channel.invokeOnClose {
      // Channel is closed...
    }

    return channel
  }

}

class Consumer : CoroutineScope {

  private val producer = Producer()

  private val job = Job()
  override val coroutineContext = job + Dispatchers.Default

  fun start() {
    launch {
      val channel = producer.start()

      for (currentValue in channel) {
        // use currentValue
      }
    }
  }

  fun stop() {
    job.cancel()
  }

}

Producer создает канал, а затем заполняет его значениями из асинхронного задания.Consumer перебирает его и использует значения.

Я ожидал, что когда я вызову job.cancel() от потребителя, итератор канала выдаст, а канал закроется.Обратный вызов invokeOnClose никогда не вызывается.

Я мог бы сохранить ссылку на канал в Consumer и сделать channel.close().Я хочу знать, есть ли более разумное решение для этого.Может быть, другой способ перебора значений канала?Спасибо?

Редактировать

Похоже, что использование

launch {
    val channel = producer.start()

    channel.consumeEach { currentValue ->
    // use currentValue
    }
}

Сработает.Однако consumeEach() помечен как устаревший.

1 Ответ

0 голосов
/ 22 ноября 2018

Вы ожидаете, что job.cancel() будет распространяться на вашего продюсера, но Producer на самом деле ни с чем не связано.Функция маркировки как suspend не делает ее сопрограммой.

Вот один из способов исправить это с помощью структурированного параллелизма:

class Producer: CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = Job() + Dispatchers.Default

    suspend fun start() = produce<String> {
        channel.send("A")

        channel.invokeOnClose {
            println("Closed")
        }
    }
}

Теперь ваш Producer знает о CoroutineScope.

И поскольку мы используем produce, вам не нужно инициализировать свой канал, как вы это делали раньше.

...