Объедините несколько Kotlin потоков в списке, не ожидая первого значения - PullRequest
3 голосов
/ 13 апреля 2020

У меня есть List<Flow<T>>, и я хотел бы сгенерировать Flow<List<T>>. Это почти то, что делает combine - за исключением того, что объединение ожидает, пока каждый Flow не выдаст начальное значение, а это не то, что я хочу. Возьмите этот код, например:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}

С combine (и, следовательно, как есть), это вывод:

[a2, b1, c]
[a2, b2, c]

В то время как я заинтересован во всех посредников шаги тоже. Вот что я хочу от этих трех потоков:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

Прямо сейчас у меня есть два обходных пути, но ни один из них не велик ... Первый - безобразный и не работает с обнуляемым типы:

val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}

Путем принудительного вызова всеми потоками первого нерелевантного значения, преобразователь combine действительно вызывается и позволяет мне удалить нулевые значения, которые, как я знаю, не являются действительными значениями. Итерация по этому, более удобочитаемая, но более тяжелая:

sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}

Теперь этот работает просто отлично, но все равно кажется, что я переусердствовал. Есть ли метод, который мне не хватает в библиотеке сопрограмм?

Ответы [ 2 ]

1 голос
/ 17 апреля 2020

Как насчет этого:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}

Это решает несколько проблем:

  • нет необходимости вводить новый тип
  • [] отсутствует в результирующий поток
  • абстрагирует обработку нуля (или как бы она ни была решена) от сайта вызова, результирующий поток имеет дело с самим собой

Так что вы не заметите любая реализация указывает c обходные пути, потому что вам не нужно иметь дело с этим во время сбора:

runBlocking {
    instantCombine(a, b, c).collect {
        println(it)
    }
}

Вывод:

[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

Попробуйте здесь!

Редактировать: Обновлен ответ для обработки потоков, которые также генерируют нулевые значения.


* Используемый низкоуровневый массив является поточно-ориентированным. Это как если бы вы имели дело с отдельными переменными.

0 голосов
/ 15 апреля 2020

Я все же хотел бы избежать сопоставления с типом промежуточной обертки, и, как кто-то упоминал в комментариях, поведение немного неправильное (сначала выдается пустой список, если еще ничего не передано в аргументах), но это немного лучше чем решения, которые я имел в виду, когда писал вопрос (все еще очень похожий) и работает с обнуляемыми типами:

inline fun <reified T> instantCombine(
  flows: Iterable<Flow<T>>
): Flow<List<T>> = combine(flows.map { flow ->
  flow.map {
    @Suppress("USELESS_CAST") // Required for onStart(null)
    Holder(it) as Holder<T>?
  }
    .onStart { emit(null) }
}) {
  it.filterNotNull()
    .map { holder -> holder.value }
}

И вот набор тестов, который проходит с этой реализацией:

class InstantCombineTest {
  @Test
  fun `when no flows are merged, nothing is emitted`() = runBlockingTest {
    assertThat(instantCombine(emptyList<Flow<String>>()).toList())
      .isEmpty()
  }

  @Test
  fun `intermediate steps are emitted`() = runBlockingTest {
    val a = flow {
      delay(20)
      repeat(3) {
        emit("a$it")
        delay(100)
      }
    }
    val b = flow {
      repeat(3) {
        delay(150)
        emit("b$it")
      }
    }
    val c = flow {
      delay(400)
      emit("c")
    }

    assertThat(instantCombine(a, b, c).toList())
      .containsExactly(
        emptyList<String>(),
        listOf("a0"),
        listOf("a1"),
        listOf("a1", "b0"),
        listOf("a2", "b0"),
        listOf("a2", "b1"),
        listOf("a2", "b1", "c"),
        listOf("a2", "b2", "c")
      )
      .inOrder()
  }

  @Test
  fun `a single flow is mirrored`() = runBlockingTest {
    val a = flow {
      delay(20)
      repeat(3) {
        emit("a$it")
        delay(100)
      }
    }

    assertThat(instantCombine(a).toList())
      .containsExactly(
        emptyList<String>(),
        listOf("a0"),
        listOf("a1"),
        listOf("a2")
      )
      .inOrder()
  }

  @Test
  fun `null values are kept`() = runBlockingTest {
    val a = flow {
      emit("a")
      emit(null)
      emit("b")
    }

    assertThat(instantCombine(a).toList())
      .containsExactly(
        emptyList<String?>(),
        listOf("a"),
        listOf(null),
        listOf("b")
      )
      .inOrder()
  }
}
...