Вот пример, иллюстрирующий мою путаницу:
fun main() = runBlocking(Dispatchers.Default + CoroutineName("Main")) {
val broadcaster = BroadcastChannel<Int>(Channel.BUFFERED)
val flow = withContext(CoroutineName("InitialFlowCreation")) {
broadcaster.asFlow()
.map {
println("first mapping in context: $coroutineContext")
it * 10
}
.broadcastIn(CoroutineScope(Dispatchers.Default + CoroutineName("BroadcastIn")))
.asFlow()
}
val updatedFlow = withContext(CoroutineName("UpdatedFlowCreation")) {
flow.map {
println("second mapping in context: $coroutineContext")
it * 10
}
.flowOn(Dispatchers.Default + CoroutineName("FlowOn"))
}
launch(CoroutineName("Collector")) {
updatedFlow.collect {
println("Collecting $it in context: $coroutineContext")
}
}
delay(1_000)
launch(CoroutineName("OriginalBroadcast")) {
for (i in 1..10) {
broadcaster.send(i)
println("Sent original broadcast from: $coroutineContext")
delay(1_000)
}
}
return@runBlocking
}
Это приводит к следующему выводу (усеченному):
Sent original broadcast from: [CoroutineName(OriginalBroadcast), StandaloneCoroutine{Active}@3a14b06a, DefaultDispatcher]
first mapping in context: [CoroutineName(InitialFlowCreation), UndispatchedCoroutine{Completed}@40202c08, DefaultDispatcher]
second mapping in context: [CoroutineName(UpdatedFlowCreation), UndispatchedCoroutine{Completed}@6cf04ddc, DefaultDispatcher]
Collecting 100 in context: [CoroutineName(Collector), StandaloneCoroutine{Active}@6ac9d4b5, DefaultDispatcher]
В документации говорится о различных вещах, которые заставляют меня быть смущенным этим результатом. В Flow у нас есть «Использовать channelFlow, если сбор и передача потока должны быть разделены на несколько сопрограмм. Он инкапсулирует всю работу по сохранению контекста и позволяет вам сосредоточиться на задаче вашего домена c» , а не инвариантные детали реализации. Можно использовать любую комбинацию компоновщиков сопрограмм из ChannelFlow. " Я знаю, что на самом деле я не использую функцию channelFlow
, но ChannelFlow
создается внутри, когда мы вызываем broadcastIn
, поэтому должны применяться те же принципы.
Я думал, что первый вызов map
будет выполняться в контексте "OriginalBroadcast"
, а вторая будет либо в контексте "BroadcastIn"
, либо в контексте "Collector"
, но вместо этого они оба будут выполняться в контексте, в котором они вызываются. Я не понимаю, почему это происходит, не должен ли контекст map
быть там, где он собирается для трансляции или контекст, где он наконец собирается, а не контекст, где вызывается map? Также звонок на flowOn
не имеет никакого эффекта. Какие работы по сохранению контекста здесь инкапсулированы?
Также я прав, что в цепочке flow.broadcastIn(...).asFlow().map{...}.broadcastIn(...).asFlow()
два созданных BroadcastChannel
не будут объединены? Пытаясь удостовериться, что я что-то не упускаю.
Я предполагаю, что я действительно ищу инклюзивную документацию о том, в каких ситуациях слиты Channel
, как они слиты, и в каком контексте операторы которые вызываются между ChannelFlow
операторами, будут работать.