У меня есть 2 Observable, которые могут выполняться параллельно. Я пытаюсь объединить 2 наблюдаемых, но получаю только результат из одного наблюдаемого.
Observable<AsyncN1qlQueryResult> queryResult = bucket.async().query(n1qlQuery)
.timeout(5000, TimeUnit.MILLISECONDS)
.retryWhen(anyOf(BackpressureException.class).max(3)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 32, 2)).build())
.retryWhen(anyOf(TemporaryFailureException.class)
.max(2)
.delay(Delay.fixed(500, TimeUnit.MILLISECONDS)).build())
.onErrorResumeNext(exp -> {
return Observable.error(exp);
});
Observable<Map<String, String>> metricsMap = queryResult.flatMap(qr ->
qr.info()).map(n1qlMetrics -> {
resultMap.put("metrics",n1qlMetrics.asJsonObject().toString());
return resultMap;
});
Observable<Map<String, String>> resultObsMap = queryResult.flatMap(asyncN1qlQueryResult ->
asyncN1qlQueryResult.errors().flatMap(error -> {
return Observable
.error(new Exception("unable to execute n1ql query " + error.toString()));
})
.switchIfEmpty(asyncN1qlQueryResult.rows())
).map(row -> {
JsonObject json = ((AsyncN1qlQueryRow) row).value();
return json.toString();
}).reduce("",(result,next) -> result + (result.equals("") ? "" :
",") + next).map(results -> {resultMap.put("results", results); return resultMap;});
, когда я запускаю следующее:
Observable<Map<String, String>> retObservable = Observable.concat(resultObsMap, metricsMap);
Я получаю результат только от resultObsMap назад.
Каким-либо образом я могу выполнить две указанные выше наблюдаемые resultObsMap и metricsMap параллельно и объединить их результаты вместе?