Kotlin обернуть последовательные вызовы ввода-вывода как последовательность - PullRequest
0 голосов
/ 09 февраля 2020

Мне нужно обработать все результаты из конечной точки API с подкачкой. Я хотел бы представить все результаты в виде последовательности.

Я придумал следующее (слегка закодировано в псевдо-кодировке):

suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
    var currentRequest: Request? = client.requestForNextPage()
    return withContext(Dispatchers.IO) {
         sequence {
            while(currentRequest != null) {
                var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
                currentRequest = client.requestForNextPage()
                yieldAll(rowsInPage)
            }
        }
     }
}

Это работает, но я не уверен насчет пары вещей:

  1. Идет ли запрос API внутри runBlocking, все еще происходит с диспетчером ввода-вывода?
  2. Есть ли способ реорганизовать код для запуска следующий запрос, прежде чем получить текущие результаты, а потом ждать его?

Ответы [ 3 ]

0 голосов
/ 09 февраля 2020

Последовательности, безусловно, не то, что вы хотите использовать в этом случае, потому что они не предназначены для работы в асинхронной среде. Возможно, вам стоит взглянуть на потоки и каналы, но для вашего случая лучший и самый простой выбор - это просто набор отложенных значений, потому что вы хотите обрабатывать все запросы одновременно (потоки и каналы обрабатывают их один за другим, может быть, с ограниченным размером буфера).

Следующий подход позволяет запускать все запросы асинхронно (при условии, что makeRequest является приостановленной функцией и поддерживает асинхронные запросы). Когда вам понадобятся ваши результаты, вам нужно будет ждать только самого медленного запроса до фини sh.

fun getClientRequests(client: Client): List<Request> {
    val requests = ArrayList<Request>()
    var currentRequest: Request? = client.requestForNextPage()
    while (currentRequest != null) {
        requests += currentRequest
        currentRequest = client.requestForNextPage()
    }
    return requests
}

// This function is not even suspended, so it finishes almost immediately
fun getAllRowsFromAPI(client: Client): List<Deferred<Page>> =
    getClientRequests(client).map {
        /* 
         * The better practice would be making getAllRowsFromApi an extension function
         * to CoroutineScope and calling receiver scope's async function.
         * GlobalScope is used here just for simplicity.
         */
        GlobalScope.async(Dispatchers.IO) { client.makeRequest(it) }
    }

fun main() {
    val client = Client()
    val deferredPages = getAllRowsFromAPI(client) // This line executes fast
    // Here you can do whatever you want, all requests are processed in background
    Thread.sleep(999L)
    // Then, when we need results....
    val pages = runBlocking {
        deferredPages.map { it.await() }
    }
    println(pages)
    // In your case you also want to "unpack" pages and get rows, you can do it here:
    val rows = pages.flatMap { it.getRows() }
    println(rows)
}
0 голосов
/ 09 февраля 2020

Я попал через suspendingSequence в Kotlin coroutines-examples:

https://github.com/Kotlin/coroutines-examples/blob/090469080a974b962f5debfab901954a58a6e46a/examples/suspendingSequence/suspendingSequence.kt

Это именно то, что я искал.

0 голосов
/ 09 февраля 2020

Вопрос 1 : API-запрос все еще будет выполняться на IO-диспетчере, но он заблокирует поток, на котором он работает. Это означает, что никакие другие задачи не могут быть запланированы в этом потоке в ожидании завершения запроса sh. На самом деле нет никакой причины использовать runBlocking в рабочем коде вообще, потому что:

  1. Если makeRequest уже является блокирующим вызовом, то runBlocking практически ничего не сделает.
  2. Если бы makeRequest был приостановленным вызовом, то runBlocking сделает код менее эффективным. Он не вернет поток в пул во время ожидания завершения запроса sh.

Является ли makeRequest блокирующим или неблокирующим вызовом, зависит от клиента, которого вы с помощью. Вот неблокирующий http-клиент, который я могу порекомендовать: https://ktor.io/clients/

Вопрос 2 : я бы использовал Flow для этой цели. Вы можете думать об этом как о приостановленном варианте Sequence. Потоки холодные , что означает, что он не будет работать, пока потребитель не запросит его содержимое (в отличие от hot , что означает, что производитель получит pu sh новых значений no важно, хочет ли потребитель этого или нет). Kotlin Flow имеет оператор под названием buffer, который можно использовать, чтобы он запрашивал больше страниц, прежде чем он полностью использует предыдущую страницу.

Код может выглядеть очень похоже на то, что у вас уже есть :

suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
    var currentRequest: Request? = client.requestForNextPage()

    while(currentRequest != null) {
        val rowsInPage = client.makeRequest(currentRequest)
        emitAll(rowsInPage.asFlow())
        currentRequest = client.requestForNextPage()
    }
}.flowOn(Dispatchers.IO)
.buffer(capacity = 1)

Емкость 1 означает, что при обработке более ранней страницы будет сделан только 1 запрос. Вы можете увеличить размер буфера, чтобы сделать больше параллельных запросов. Вы должны проверить этот доклад на KotlinConf 2019, чтобы узнать больше о потоках: https://www.youtube.com/watch?v=tYcqn48SMT8

...