Как написать RX ConcatArrayEager эквивалент в Kotlin CoRoutine? - PullRequest
0 голосов
/ 04 сентября 2018

Я хотел бы преобразовать мой код rxJava в Kotlin CoRoutine.

Ниже приведен код, который выполняет вызовы api и db и возвращает данные в интерфейс независимо от того, что произойдет раньше. Скажем, если ответ БД происходит быстрее, чем API. В этом случае ответ API продолжится до тех пор, пока он не получит данные для синхронизации с БД, хотя он мог бы выполнить обновление интерфейса раньше.

Как бы я это сделал?

class MoviesRepository @Inject constructor(val apiInterface: ApiInterface,
                                        val MoviesDao: MoviesDao) {

fun getMovies(): Observable<List<Movie>> {
    val observableFromApi = getMoviesFromApi()
    val observableFromDb = getMoviesFromDb()
    return Observable.concatArrayEager(observableFromApi, observableFromDb)
}

fun getMoviesFromApi(): Observable<List<Movie>> {

    return apiInterface.getMovies()
            .doOnNext { it ->
                it.data?.let { it1 -> MoviesDao.insertAllMovies(it1) }
                println("Size of Movies from API %d", it.data?.size)
            }
            .map({ r -> r.data })
}

fun getMoviesFromDb(): Observable<List<Movie>> {
    return MoviesDao.queryMovies()
            .toObservable()
            .doOnNext {
                //Print log it.size :)
            }
}

}

Ответы [ 2 ]

0 голосов
/ 05 сентября 2018

В качестве первого шага вы должны создать suspend fun s для ваших ApiInterface и MovieDao вызовов. Если у них есть некоторый API на основе обратного вызова, вы можете следовать этим официальным инструкциям .

Теперь у вас должно быть

suspend fun ApiInterface.suspendGetMovies(): List<Movie>

и

suspend fun MoviesDao.suspendQueryMovies(): List<Movie>

Теперь вы можете написать этот код:

launch(UI) {
    val fromNetwork = async(UI) { apiInterface.suspendGetMovies() }
    val fromDb = async(UI) { MoviesDao.suspendQueryMovies() }
    select<List<Movie>> {
        fromNetwork.onAwait { it }
        fromDb.onAwait { it }
    }.also { movies ->
        // act on the movies
    }
}

Основным моментом является select вызов, который будет одновременно ожидаться в обоих Deferred с и воздействовать на тот, который завершается первым.

Если вы хотите убедиться, что вы действуете по результатам из сети, вам понадобится еще немного кода, например:

    val action = { movies: List<Movie> ->
        // act on the returned movie list
    }
    var gotNetworkResult = false
    select<List<Movie>> {
        fromNetwork.onAwait { gotNetworkResult = true; it }
        fromDb.onAwait { it }
    }.also(action)
    if (!gotNetworkResult) {
        action(fromNetwork.await())
    }

Этот код будет действовать на результаты БД, только если они поступят до результатов сети, которые он будет обрабатывать во всех случаях.

0 голосов
/ 04 сентября 2018

Что-то в этом роде должно работать:

data class Result(val fromApi: ???, val fromDB: ???)

fun getMovies(): Result {
    val apiRes = getMoviesFromApiAsync()
    val dbRes = getMoviesFromDbAsync()
    return Result(apiRes.await(), dbRes.await())
}

fun getMoviesFromApiAsync() = async {

    return apiInterface.getMovies()
            .doOnNext { it ->
                it.data?.let { it1 -> MoviesDao.insertAllMovies(it1) }
                println("Size of Movies from API %d", it.data?.size)
            }
            .map({ r -> r.data })
}

fun getMoviesFromDbAsync() = async {
    return MoviesDao.queryMovies()           
}

Я не знаю, что вы возвращаете, поэтому я просто вместо этого ставлю ???.

...