У меня есть вызов API, который проверяет некоторый статус по «Id». API возвращает Single или error. У меня есть список таких идентификаторов, только один идентификатор может вернуть успех или ничего (ошибка возврата всех идентификаторов). Что мне нужно, так это перебирать каждый идентификатор и пропускать ошибки при вызове API до тех пор, пока не будет достигнут успех или не закончится список. Я могу добиться этого последовательно. Однако я пытаюсь сделать то же самое, используя ParallelFlowable. Он отлично работает, когда идентификатор возвращает успех, но когда нет идентификатора, который возвращает успех (все идентификаторы терпят неудачу), он просто пропускает все ошибки из API, но не уведомляет подписчика после того, как все идентификаторы проверены. Я не знаю, как с этим справиться.
// API call
fun getStatus(Id: String): Single<String> {
//... returns Single<String> or error
}
//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
Observable.fromIterable(ids)
.flatMapSingle { id ->
getStatus(id)
.onErrorResumeWith { singleSource ->
if (ids.last() == id)) { //If this is last item in list, return error
singleSource.onError(NoStatusFoundException())
} else {
// Skip errors until valid id is found or till the list reached end.
Flowable.empty<String>()
}
}
}.firstOrError()
}
// Parallel Flow, How to identify the list is completed and return NoStatusFoundException in case of all id's fail?
fun getStatus(ids: List<String>): Single<String> {
Flowable.fromIterable(ids)
.parallel()
.runOn(io())
.flatMap{ id -> getStatus(id).toFlowable()
.onErrorResumeWith { Flowable.empty<String>() }
}
.sequentialDelayError()
.firstOrError()
.onErrorResumeNext { Single.error(it) }
}
// Subscription
getStatus(listOf("1","2","3","4","5",))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscriber({ id->
// success
this is notified when an id is success
},
{ // error handler - Need help here
Never notified when all the id's fail?
})