Как отменить / отписаться от сопрограмм Flow - PullRequest
5 голосов
/ 10 января 2020

Я замечаю странное поведение при попытке преждевременно отменить из потока. Посмотрите на следующий пример.

Это простой поток, который выдает целочисленные значения

  private fun createFlow() = flow {
        repeat(10000) {
            emit(it)
        }
    }

Затем я вызываю функцию createFlow, используя этот код

  CoroutineScope(Dispatchers.Main).launch {
            createFlow().collect {

                Log.i("Main", "$it isActive $isActive")
                if (it == 2) {
                    cancel()
                }
            }
        }

Это то, что распечатано

0 isActive true
1 isActive true
2 isActive true
3 isActive false
4 isActive false
etc...etc

Теперь я ожидал бы, что поток должен прекратить излучать целые числа, как только он достигнет значения 2, но вместо этого он фактически переключает флаг isActive в false и продолжает излучать без других действий. остановка.

Когда я добавляю задержку между выбросами, поток ведет себя так, как я ожидал.

private fun createFlow() = flow {
    repeat(10000) {
        delay(500) //add a delay
        emit(it)
    }
}

Это то, что распечатывается после повторного вызова потока (что является ожидаемым поведением) .

0 isActive true
1 isActive true
2 isActive true

Что я могу сделать, чтобы отменить выброс потока точно на указанное значение без добавления задержки?

1 Ответ

5 голосов
/ 10 января 2020

Я нашел обходной путь в этой связанной проблеме

Я заменил каждую collect на safeCollect в моем проекте:

/**
 * Only proceed with the given action if the coroutine has not been cancelled.
 * Necessary because Flow.collect receives items even after coroutine was cancelled
 * https://github.com/Kotlin/kotlinx.coroutines/issues/1265
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
  collect {
    coroutineContext.ensureActive()
    action(it)
  }
}
...