Kotlin Поток выполняет два вызова API параллельно и собирает каждый результат по мере его поступления. - PullRequest
4 голосов
/ 12 февраля 2020

Я пытаюсь реализовать кеш, а затем сетевую стратегию для моего вызова API, используя Kotlin Flows. Вот что я пытаюсь сейчас

flowOf(
 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
   .catch { error -> Timber.e(error) },
 remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
  Timber.i("Response Received")
}

Проблема здесь в том, что collect вызывается только когда getDataFromServer возвращается. Я ожидаю, что я должен получить первое событие из кэша, а затем второе событие с сервера через несколько миллисекунд. В этом случае "Response Received" печатается дважды, но сразу один за другим.

В этом другом варианте "Response Received" печатается только один раз, то есть после возврата getDataFromServer().

 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
  .catch { error -> Timber.e(error) }
  .flatMapConcat {
    remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
  }
  .collect {
    Timber.i("Response Received")
  }

Раньше я использовал Rx Java s Flowable.concat(), и он работал отлично. Есть ли в потоках Kotlin что-то, что может имитировать такое поведение?

Ответы [ 3 ]

1 голос
/ 13 февраля 2020

Первая проблема c в вашем дизайне заключается в том, что функция возврата потока также может быть приостановлена. Это два уровня подвески. Функции должны возвращать потоки без каких-либо задержек, а сами потоки должны генерировать элементы по мере их поступления. Если вы следуете этому руководству, ваш исходный код уже будет работать.

То, как вы написали эти функции, они все равно могут работать, если Вы пишете это:

flow<String> {
    emitAll(getCached())
    emitAll(getFromServer())
}
0 голосов
/ 13 февраля 2020

Недавно оператор merge был добавлен в версию Kotlin сопрограмм 1.3.3. Вот слитый PR .

Используя оператор слияния, вы сможете получить результат по мере его поступления.

0 голосов
/ 13 февраля 2020

Получается, если flowOf(someOperation()) someOperation() необходимо завершить для нисходящего потока, чтобы начать обработку. Это как Observable.just(someOperation()) в RxJava мире.

Во втором сценарии flatMapConcat на самом деле является оператором transform, поэтому он, очевидно, возвращает окончательно обработанный вывод.

Кажется, что в мире Flow не хватает нативных concat операторов. Вот как я решил эту проблему в итоге

flow {
   remoteDataSource.getDataFromCache()
   .catch { error -> Timber.e(error) }
   .onCompletion {
       remoteDataSource.getDataFromServer()
            .collect {
                 emit(it)
            }
    }.collect { emit(it) }
}
...