Несоответствие типов, возвращающее наблюдаемое с fromCallable - PullRequest
0 голосов
/ 03 июня 2019

Следующий код не компилируется:

override fun storeConnections(connections: List<Connection>): Observable<List<Connection>> =
    Observable.fromCallable<List<Connection>> {
        appDao.storeConnections(connections.map {
                            mapper.toDb(it)})


    }

Строка с appDao.storeConnections указывает на следующую ошибку:

Обязательный список!

Найденная единица

StoreConnections выполняется с использованием Room:

@Dao
interface RoomDao {
    @Insert(onConflict = REPLACE)
    fun storeConnections(linkedInConnection: List<LinkedInConnectionEntity>)

}

storeConnections вызывается из моего потока rx:

val startPositions = BehaviorSubject.createDefault(0)

startPositions.flatMap { startPos -> App.context.repository.getConnections(startPos) }
    .flatMap { connections -> Observable.fromCallable(App.context.repository.storeConnections(connections)) }
    .doOnNext { ind -> startPositions.onNext(ind + 1) }
    .subscribe({ ind -> println("Index $ind") })

Как правильно реализовать это из Callable?

Ответы [ 2 ]

0 голосов
/ 06 июня 2019

с учетом вашего ответа на ваш вопрос:

storeConnections ничего не возвращает. Но мне нужно обернуть это в наблюдаемое, чтобы подтолкнуть его вниз по течению. Поэтому, возможно, мой вопрос заключается в том, как обернуть вызов API в Observable, когда этот вызов API ничего не возвращает.

Я отвечу, как вы можете обернуть его в наблюдаемое, чтобы протолкнуть его вниз по течению:

        .flatMap {
                connections -> 
                    App.context.repository.storeConnections(connections)
                        .andThen(Observable.just(connections))
        }

Учитывая, что storeConnections возвращает Completable:

    override fun storeConnections(connections: List<Connection>): Completable =
        Completable.fromAction {
            appDao.storeConnections(connections.map { mapper.toDb(it) } )
        }
    }

Если storeConnections возвращает «ничто», вы можете просто переместить Completable.fromAction в ваш поток:

        .flatMap {
                connections -> 
                    Completable.fromAction { App.context.repository.storeConnections(connections) }
                        .andThen(Observable.just(connections))
        }
0 голосов
/ 03 июня 2019

Ключ к тому, чтобы заставить его работать, использует это:

return@fromCallable connections

Итак, это исправленный код:

override fun storeConnections(connections: List<Connection>): Observable<List<Connection>> =
    Observable.fromCallable<List<Connection>> {

        appDao.storeConnections(connections.map {
            mapper.toDb(it)
        })

        return@fromCallable connections
    }

И поток rx, который его вызывает:

val startPositions = BehaviorSubject.createDefault(0)

    startPositions.flatMap { startPos -> App.context.repository.getConnections(startPos) }
        .flatMap {
                connections -> App.context.repository.storeConnections(connections)
        }
        .doOnNext {
                connections -> startPositions.onNext(startPos++)
        }
        .subscribe({ ind -> println("Index $ind") })
...