Ответы 1. и 4.:
Чтобы разграничить параллельные задачи и дать им возможность завершиться sh независимо (получение некоторых значений при отмене остальных), вам потребуется использовать канал и, желательно, Поток. Упрощенный пример:
fun processListWithSomeWorkers(list: List<Whatever>, concurrency: Int): Flow<Result> = channelFlow {
val workToDistribute = Channel<Whatever>()
launch { for(item in list) workToDistribute.send(item) } // one coroutine distributes work...
repeat(concurrency) { // launch a specified number of worker coroutines
launch {
for (task in workToDistribute) { // which process tasks in a loop
val atomicResult = process(task)
send(atomicResult) // and send results downstream to a Flow
}
}
}
}
И затем вы можете обрабатывать результаты один за другим, поскольку они создаются, ожидая, пока весь поток достигнет sh или, например, просто возьмите некоторые из них, когда это необходимо: resultFlow.take(20).onEach { ... }.collectIn(someScope)
Поскольку это Поток, он начнет работать только тогда, когда кто-нибудь начнет собирать (он холодный), что обычно хорошо.
Целое можно сделать немного короче, так как вы обнаружите некоторые более подробные c и экспериментальные функции (как производят). Его можно обобщить как оператор Flow следующим образом:
fun <T, R> Flow<T>.concurrentMap(concurrency: Int, transform: suspend (T) -> R): Flow<R> {
require(concurrency > 1) { "No sense with concurrency < 2" }
return channelFlow {
val inputChannel = produceIn(this)
repeat(concurrency) {
launch {
for (input in inputChannel) send(transform(input))
}
}
}
}
и использовать: list.asFlow().concurrentMap(concurrency = 4) { <your mapping logic> }
Команда corotuines думает о добавлении семейства параллельных операторов в потоки Flow, но их еще нет, AFAIK.