Пытаюсь сделать звонок по Rx Java. Я могу использовать слишком много цепочек во время разговора. Существует так много Observable и преобразований, что, боюсь, сборка потеряна. К сожалению, мне нужно использовать эти функции. У меня здесь есть метод, который я вызываю в своем фрагменте. Этот метод никогда не вызывается, когда я отлаживаю и устанавливаю точку останова.
В моем фрагменте я вызываю offlineItems () здесь:
private fun streamDownloads(): Observable<Unit> {
return downloadsDataRepository.offlineItems()
.observeOn(AndroidSchedulers.mainThread())
.map { downloadLoaded(it) } // Exception here.
}
DownloadLoaded (it) не вызывается и иногда возвращает исключение. Я обеспокоен тем, что карта слишком усложняет цепной вызов в моем Rx Java. Вот вызов offlineItems ().
fun offlineItems(): Observable<List<MediaItem>> {
val list = getAllMyDownloadedMediaItems()
return list.flatMapIterable { it }
.flatMap { mediaStore.getMediaItemWithId(MediaId(it.request.id)) } // this method is called .
.toList() // returns an Observable<MediaItem>
.toObservable()
}
Чтобы быть внимательным, mediaStoreCall:
override fun getMediaItemWithId(mediaId: MediaId): Observable<MediaItem> {
return if (isOnline()) {
Observable.just(Unit)
.effectMap { fetchAndStoreRemote(mediaId) }
.flatMap { mediaItemFromDB(mediaId) }
} else {
mediaItemFromDB(mediaId)
}
}
Список, который я получаю (загружается из exoplayer):
fun getAllMyDownloadedMediaItems(): Observable<List<Download>> {
return Observable.just(downloadManager.downloadIndex.getDownloads(Download.STATE_COMPLETED).use { index ->
mutableListOf<Download>().apply {
if (index.isFirst || index.moveToFirst()) {
do {
add(index.download)
} while (index.moveToNext())
}
}
})
}
Метод, загруженный в фрагмент, вызываемый из вышеуказанного метода:
private fun downloadLoaded(downloads: List<MediaItem>) {
if (downloads.isEmpty()) {
downloadStateView.setState(StateView.State.EMPTY)
} else {
downloadStateView.setState(StateView.State.CONTENT)
episodeAdapter.items = listOf(Header(downloads.size)) + downloads.map(::Item)
}
}
Я вызываю streamDownloads в onResume фрагмента.
override fun onResume() {
super.onResume()
Observable.merge(
streamDownloads(),
handleEmptyAction()
).autoDispose(this)
.subscribe()
}
Я ни в коем случае не эксперт по Rx Java, так что если кто-то может указать мне, что я делаю неправильно.
ИЗМЕНИТЬ Трассировка стека:
RxJavaAssemblyException: assembled
at dalvik.system.VMStack.getThreadStackTrace(Native Method)
at io.reactivex.Observable.map(Observable.java:9781)
at DownloadsFragment.streamDownloads(DownloadsFragment.kt:143)
at DownloadsFragment.onResume(DownloadsFragment.kt:128)
at androidx.fragment.app.Fragment.performResume(Fragment.java:2649)
at androidx.fragment.app.FragmentManagerImpl.moveToState
at FragmentManagerImpl.moveFragmentToExpectedState
at FragmentManagerImpl.moveToState(FragmentManagerImpl.java:1303)
at FragmentManagerImpl.dispatchStateChange(FragmentManagerImpl.java:2659)
at FragmentManagerImpl.dispatchResume(FragmentManagerImpl.java:2625)
at androidx.fragment.app.Fragment.performResume(Fragment.java:2658)
at FragmentManagerImpl.moveToState(FragmentManagerImpl.java:922)
at FragmentManagerImpl.moveFragmentToExpectedState
at FragmentManagerImpl.moveToState(FragmentManagerImpl.java:1303)
at FragmentManagerImpl.executeOpsTogether(FragmentManagerImpl.java:1884)
at FragmentManagerImpl.removeRedundantOperationsAndExecute
at FragmentManagerImpl.execPendingActions(FragmentManagerImpl.java:1727)
at FragmentManagerImpl$2.run(FragmentManagerImpl.java:150)
at android.os.Handler.handleCallback(Handler.java:873)
at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:193)
at android.app.ActivityThread.main(ActivityThread.java:6669)
at RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)