Рефакторинг RxJava Observable concat code и обработка ошибок функциональным способом - PullRequest
0 голосов
/ 29 июня 2018

Как я могу улучшить приведенный ниже код и улучшить обработку ошибок, чтобы код был более функциональным по своей природе.

final Observable<A> aResponse =a.update(52, 33, "759", obj);

final Observable<B> bResponse =b.fetch(52, 759);
Map map1 = new HashMap();
Map map2 = new HashMap();

Два остальных вызова API, которые возвращают наблюдаемые. Результаты объединяются под map1 и map2.


.concat Я использую concat здесь, чтобы объединить результаты двух API. Как я могу использовать .zip () здесь или любую другую функцию. Может ли flatMap() использоваться здесь?

.onErrorResumeNext используется для продолжения выполнения bResponse, если aresponse выдает ошибку. Как я могу регистрировать ошибки лучше, если aResponse терпит неудачу. Я должен сделать это внутри onErrorResumeNext

.subscribe используется для того, чтобы наблюдатель мог видеть объекты, испущенные в результате объединения двух наблюдаемых.
onError Я использую onError, чтобы регистрировать ошибки, если bResponse не удается. Как я могу сделать это лучше?

onNext В onNext я получаю тип объекта, присваивая его B, если это instanceOf B, а затем заполняю элементы в map1 и map2. Как я могу исправить это лучше?

Observable.concat(aResponse, bResponse)
            .onErrorResumeNext(new Func1<Throwable, Observable<B>>() {
                @Override
                public Observable<B> call(Throwable throwable) {
                    return bResponse;
                }
            })
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {

                 System.out.println("The response list after the fututres");
               }
                @Override
                public void onError(Throwable e) {

                    System.out.println("ERROR IN BRESPONSE");
                    e.printStackTrace();
                }

                @Override
                public void onNext(Object o) {
                    if(o instanceof B)
                    {
                        ((B)o).getSomething().stream().forEach( s ->  {
                                   map1.put(s.getId(),s.getNumber());
                                   map2.put(s.getId(), s.getList());
                                });
                    }

                }
            });
}

1 Ответ

0 голосов
/ 29 июня 2018

Если aResponse нужно закончить, прежде чем вы начнете наблюдать bResponse, лучше не объединять цепочки наблюдателей в Observable<Object>.

aResponse
  .doOnError( error -> log.error( "Error on A stream", error )
  .onErrorResumeNext( Observable.empty() )
  .toCompletable()
  .andThen( bResponse )
  .subscribe( ... );

По существу, вышеприведенная цепочка контролирует aResponse до ее завершения, игнорируя все элементы. Когда aResponse закончится, вы начнете наблюдать bResponse, делая то, что вам нужно для B элементов.

Если вам нужно что-то сделать с элементами A, вы можете добавить doOnNext() в цепочку наблюдателей до toCompletable().

...