Я подготовил три подхода к решению этой проблемы, от самого простого до самого правильного. Чтобы упростить представление подходов, я извлек общий код:
lifecycleScope.launch {
val itemById = try {
fetchItems(itemIds)
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "Fetched these items: $itemById")
}
Прежде чем я продолжу, общее примечание: ваша функция getItem()
приостановлена, вам не нужно отправлять ее вIO
диспетчер. Все ваши сопрограммы могут выполняться в главном потоке.
Теперь давайте посмотрим, как мы можем реализовать fetchItems(itemIds)
.
1. Simple forEach
Здесь мы используем тот факт, что весь код сопрограммы может выполняться в главном потоке:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
val itemById = mutableMapOf<Long, Item>()
coroutineScope {
itemIds.forEach { itemId ->
launch { itemById[itemId] = MyService.getItem(itemId) }
}
}
return itemById
}
coroutineScope
будет ожидать всех сопрограмм, внутри которых вы launch
Это. Несмотря на то, что все они работают одновременно друг с другом, запущенные сопрограммы по-прежнему отправляются в один (основной) поток, поэтому нет проблем с параллелизмом при обновлении карты для каждого из них.
2. Потокобезопасный вариант
Тот факт, что он использует свойства однопоточного контекста, можно рассматривать как ограничение первого подхода: он не обобщается на контексты на основе потокового пула. Мы можем избежать этого ограничения, полагаясь на механизм async-await
:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
.map { it.await() }
.toMap()
}
Здесь мы полагаемся на два неочевидных свойства Collection.map()
:
- Он выполняет всепреобразование с нетерпением, поэтому первое преобразование в коллекцию
Deferred<Pair<Long, Item>>
полностью выполняется перед переходом ко второму этапу, где мы ожидаем всех из них. - Это встроенная функция, которая позволяет нам писать код с задержкойв нем, хотя сама функция не является
suspend fun
и получает неподдерживаемую лямбду (Deferred<T>) -> T
.
Это означает, что вся выборка выполняется одновременно, но карта собирается водиночная сопрограмма.
3. Основанный на потоке подход с улучшенным контролем параллелизма
Вышесказанное решило параллелизм для нас, но в нем отсутствует обратное давление. Если ваш список ввода очень большой, вы захотите установить ограничение на количество одновременных сетевых запросов.
Вы можете сделать это с идиомой Flow
:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
.asFlow()
.flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
flow { emit(itemId to MyService.getItem(itemId)) }
}
.toMap()
Здесь магия заключается в операции .flatMapMerge
. Вы даете ему функцию (T) -> Flow<R>
, и она будет выполнять ее последовательно на всех входах, но затем она будет одновременно собирать все полученные потоки. Обратите внимание, что я не смог упростить flow { emit(getItem()) } }
до flowOf(getItem())
, потому что getItem()
должен вызываться лениво при сборе потока.
Flow.toMap()
в настоящее время не предоставляется в стандартной библиотеке, поэтому здесьэто:
suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
val result = mutableMapOf<K, V>()
collect { (k, v) -> result[k] = v }
return result
}