Ожидание ответов от нескольких наблюдаемых в Vertx - PullRequest
0 голосов
/ 20 сентября 2018

Я использую vertx-rx-java

У меня есть обработчик, в котором мне нужно сделать 2 разных запроса через EventBus и создать ответ, используя ответы на эти 2 запроса.

public handle(RoutingContext context) {
....some code...    

    Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
    Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
    ... TODO ...
}

В основном мне нужно объединить два результата запроса и поместить их в RoutingContext ответ.Проблема в том, что я не совсем понимаю, как это сделать в стиле rxjava.Единственный способ, которым я мог придумать, это что-то вроде этого:

firstRequest.doOnSuccess(resp1 -> {
  secondRequest.doOnSuccess(resp2 -> {

  });
});

Но я думаю, что это плохой способ, потому что, если есть 10 запросов вместо 2?этот код будет иметь 10 вложенных вызовов.

Есть ли более эффективные способы объединения результатов нескольких запросов?

1 Ответ

0 голосов
/ 21 сентября 2018

оператор zip может использоваться для связанных выбросов из нескольких источников, с той разницей, что он излучает только тогда, когда излучает каждый из его базовых источников.так что ...

  • в случае, если есть два базовых источника, zip будет излучать в парах.
  • в случае наличия трех базовых источников zip будет излучать триплетами.
  • ... и т. Д.

, чтобы получить практический смыслиз того, что я имею в виду, вы можете обратиться к странице RxMarbles и поиграть с выбросами в двух верхних потоках, наблюдая за нижним потоком.

с этим пониманием вы можете использовать *Оператор 1018 * zip для объединения результатов ответов Message, например:

Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
    // ...do stuff with the replies and compose some result
    //    to be handled in onSuccess()
    return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
    result -> {
        System.out.println("## onSuccess(" + result + ")");
    },
    error -> {
        System.err.println("## onError(" + error.getMessage() + ")");
    }
);

, если при любой из доставок произошел сбой, то сработает обработчик onError.onSuccess будет запущено в противном случае.

, если, как вы упомянули, у вас есть большое количество запросов, которые вы хотели бы обработать одновременно, существует перегруженный вариант zip который принимает Iterable источников.в вашем случае это может выглядеть примерно так:

final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);

Single.zip(requests, replies -> {
    // ...do stuff with the array of replies
    return null;
})
.subscribe(...);

надеюсь, это поможет!

...