Rx Java: пропустить все ошибки в fromIterable () и уведомить подписчика, когда все элементы отправлены - Flowable.parallel выполнение - PullRequest
1 голос
/ 07 мая 2020

У меня есть вызов API, который проверяет некоторый статус по «Id». API возвращает Single или error. У меня есть список таких идентификаторов, только один идентификатор может вернуть успех или ничего (ошибка возврата всех идентификаторов). Что мне нужно, так это перебирать каждый идентификатор и пропускать ошибки при вызове API до тех пор, пока не будет достигнут успех или не закончится список. Я могу добиться этого последовательно. Однако я пытаюсь сделать то же самое, используя ParallelFlowable. Он отлично работает, когда идентификатор возвращает успех, но когда нет идентификатора, который возвращает успех (все идентификаторы терпят неудачу), он просто пропускает все ошибки из API, но не уведомляет подписчика после того, как все идентификаторы проверены. Я не знаю, как с этим справиться.

// API call
fun getStatus(Id: String): Single<String> {
  //... returns Single<String> or error
}

//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
  Observable.fromIterable(ids)
                .flatMapSingle { id ->
                    getStatus(id)
                        .onErrorResumeWith { singleSource ->
                            if (ids.last() == id)) { //If this is last item in list, return error
                                singleSource.onError(NoStatusFoundException())
                            } else {
                                // Skip errors until valid id is found or till the list reached end.
                                Flowable.empty<String>()
                            }
                        }
                }.firstOrError()
}

// Parallel Flow, How to identify the list is completed and return NoStatusFoundException in case of all id's fail?
fun getStatus(ids: List<String>): Single<String> { 
              Flowable.fromIterable(ids)
                .parallel()
                .runOn(io())
                .flatMap{ id -> getStatus(id).toFlowable()
                          .onErrorResumeWith { Flowable.empty<String>() }
                        }
                .sequentialDelayError()
                .firstOrError()
                .onErrorResumeNext { Single.error(it) }
}

// Subscription
getStatus(listOf("1","2","3","4","5",))
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscriber({ id->
       // success
       this is notified when an id is success
    },
    { // error handler - Need help here
       Never notified when all the id's fail?
    })

Ответы [ 3 ]

0 голосов
/ 09 мая 2020

Проблема в этой строке:

.onErrorResumeWith { Flowable.empty<String>() }

Параметр onErrorResumeWith - это Publisher<out T>, а не () -> Publisher<out T>. Интерфейс Publisher имеет единственный метод, void subscribe​(Subscriber<? super T> s). Таким образом, он подходит для преобразования SAM.

Лямбда { Flowable.empty<String>() } является совершенно допустимым (Subscriber<String>) -> Unit, который игнорирует свой единственный параметр, вызывает метод и игнорирует результат. Это компилируется, но результат для всех практических целей такой же, как Flowable.never().

Вместо лямбды вам нужно передать Flowable.empty() непосредственно в onErrorResumeNext():

            .flatMap{ id -> getStatus(id).toFlowable()
                      .onErrorResumeWith(Flowable.empty<String>())
                    }
0 голосов
/ 13 мая 2020

Я могу решить эту проблему, удалив onErrorResumeWith { Flowable.empty<String>() } из flatMap и реализовав RxJavaPlugins.setErrorHandler{...}. sequentialDelayError() задерживает все ошибки до тех пор, пока все рельсы не завершат свою задачу.

fun getStatus(ids: List<String>): Single<String> { 
              Flowable.fromIterable(ids)
                .parallel()
                .runOn(io())
                .flatMap{ id -> getStatus(id).toFlowable()
                        }
                .sequentialDelayError()
                .firstOrError()
                .onErrorResumeNext { Single.error(it) }
}

///
RxJavaPlugins.setErrorHandler{ it: Throwable ->
  is UndeliverableException -> {...}
  .....
}
0 голосов
/ 07 мая 2020

Вы возвращаете Flowable.empty (), который немедленно завершает подписку. Взято из документации:

Возвращает Flowable, который не передает никаких элементов в {@link Subscriber}, и немедленно вызывает его метод {@link Subscriber # onComplete onComplete}.

Возможно, вы можете вернуть Flowable.just("") или предоставить ожидаемый аргумент в случае ошибки.

.onErrorResumeWith { Flowable.just("") }
...