Слияние / объединение двух наблюдаемых дает результаты только из одного - PullRequest
0 голосов
/ 28 мая 2020

У меня есть 2 Observable, которые могут выполняться параллельно. Я пытаюсь объединить 2 наблюдаемых, но получаю только результат из одного наблюдаемого.

Observable<AsyncN1qlQueryResult> queryResult = bucket.async().query(n1qlQuery)
    .timeout(5000, TimeUnit.MILLISECONDS)
    .retryWhen(anyOf(BackpressureException.class).max(3)
        .delay(Delay.exponential(TimeUnit.MILLISECONDS, 32, 2)).build())
    .retryWhen(anyOf(TemporaryFailureException.class)
        .max(2)
        .delay(Delay.fixed(500, TimeUnit.MILLISECONDS)).build())
    .onErrorResumeNext(exp -> {
      return Observable.error(exp);
    });


Observable<Map<String, String>> metricsMap = queryResult.flatMap(qr ->
    qr.info()).map(n1qlMetrics -> {
                resultMap.put("metrics",n1qlMetrics.asJsonObject().toString());
                return resultMap;
                          });

Observable<Map<String, String>> resultObsMap = queryResult.flatMap(asyncN1qlQueryResult ->
    asyncN1qlQueryResult.errors().flatMap(error -> {
      return Observable
          .error(new Exception("unable to execute n1ql query " + error.toString()));
    })
        .switchIfEmpty(asyncN1qlQueryResult.rows())
).map(row -> {
  JsonObject json = ((AsyncN1qlQueryRow) row).value();
  return json.toString();
}).reduce("",(result,next) -> result + (result.equals("") ? "" :
    ",") +  next).map(results -> {resultMap.put("results", results); return resultMap;});

, когда я запускаю следующее:

Observable<Map<String, String>> retObservable = Observable.concat(resultObsMap, metricsMap);

Я получаю результат только от resultObsMap назад.

Каким-либо образом я могу выполнить две указанные выше наблюдаемые resultObsMap и metricsMap параллельно и объединить их результаты вместе?

...