Параллельный запрос с функциями Retrofit, Coroutines и Suspend - PullRequest
4 голосов
/ 01 ноября 2019

Я использую Retrofit для выполнения сетевых запросов. Я также использую сопрограммы в сочетании с функциями 'suspend'.

Мой вопрос: есть ли способ улучшить следующий код. Идея состоит в том, чтобы запустить несколько запросов в параллелях и дождаться их завершения, прежде чем продолжить выполнение функции.

lifecycleScope.launch {
    try {
        itemIds.forEach { itemId ->
            withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
        }
    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

(Обратите внимание, что «MyService.getItem ()» является функцией «suspend».)

Полагаю, что в этом случае есть нечто более приятное, чем foreach .

У кого-нибудь есть идеи?

Ответы [ 2 ]

5 голосов
/ 01 ноября 2019

Я подготовил три подхода к решению этой проблемы, от самого простого до самого правильного. Чтобы упростить представление подходов, я извлек общий код:

lifecycleScope.launch {
    val itemById = try {
        fetchItems(itemIds)
    } catch (exception: Exception) {
        exception.printStackTrace()
    }
    Log.i(TAG, "Fetched these items: $itemById")
}

Прежде чем я продолжу, общее примечание: ваша функция getItem() приостановлена, вам не нужно отправлять ее вIO диспетчер. Все ваши сопрограммы могут выполняться в главном потоке.

Теперь давайте посмотрим, как мы можем реализовать fetchItems(itemIds).

1. Simple forEach

Здесь мы используем тот факт, что весь код сопрограммы может выполняться в главном потоке:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
    val itemById = mutableMapOf<Long, Item>()
    coroutineScope {
        itemIds.forEach { itemId ->
            launch { itemById[itemId] = MyService.getItem(itemId) }
        }
    }
    return itemById
}

coroutineScope будет ожидать всех сопрограмм, внутри которых вы launchЭто. Несмотря на то, что все они работают одновременно друг с другом, запущенные сопрограммы по-прежнему отправляются в один (основной) поток, поэтому нет проблем с параллелизмом при обновлении карты для каждого из них.

2. Потокобезопасный вариант

Тот факт, что он использует свойства однопоточного контекста, можно рассматривать как ограничение первого подхода: он не обобщается на контексты на основе потокового пула. Мы можем избежать этого ограничения, полагаясь на механизм async-await:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
    itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
            .map { it.await() }
            .toMap()
}

Здесь мы полагаемся на два неочевидных свойства Collection.map():

  1. Он выполняет всепреобразование с нетерпением, поэтому первое преобразование в коллекцию Deferred<Pair<Long, Item>> полностью выполняется перед переходом ко второму этапу, где мы ожидаем всех из них.
  2. Это встроенная функция, которая позволяет нам писать код с задержкойв нем, хотя сама функция не является suspend fun и получает неподдерживаемую лямбду (Deferred<T>) -> T.

Это означает, что вся выборка выполняется одновременно, но карта собирается водиночная сопрограмма.

3. Основанный на потоке подход с улучшенным контролем параллелизма

Вышесказанное решило параллелизм для нас, но в нем отсутствует обратное давление. Если ваш список ввода очень большой, вы захотите установить ограничение на количество одновременных сетевых запросов.

Вы можете сделать это с идиомой Flow:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
        .asFlow()
        .flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
            flow { emit(itemId to MyService.getItem(itemId)) }
        }
        .toMap()

Здесь магия заключается в операции .flatMapMerge. Вы даете ему функцию (T) -> Flow<R>, и она будет выполнять ее последовательно на всех входах, но затем она будет одновременно собирать все полученные потоки. Обратите внимание, что я не смог упростить flow { emit(getItem()) } } до flowOf(getItem()), потому что getItem() должен вызываться лениво при сборе потока.

Flow.toMap() в настоящее время не предоставляется в стандартной библиотеке, поэтому здесьэто:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
    val result = mutableMapOf<K, V>()
    collect { (k, v) -> result[k] = v }
    return result
}
1 голос
/ 02 ноября 2019

Если вы ищете более хороший способ написать это и исключить foreach

lifecycleScope.launch {
    try {

        itemIds.asFlow()
               .flowOn(Dispatchers.IO) 
               .collect{ itemId -> itemById[itemId] = MyService.getItem(itemId)}

    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

Также, пожалуйста, посмотрите на lifecycleScope Я подозреваю, что он использует Dispatchers.Main. В этом случае вы можете удалить это .flowOn(Dispatchers.IO) дополнительное объявление диспетчера.

Для получения дополнительной информации: Асинхронный поток Котлина

...