Как обрабатывать результаты для каждого будущего списка и когда все фьючерсы завершены в vert.x? - PullRequest
0 голосов
/ 31 мая 2018

У меня есть List из Future и мне нужно обработать результаты по каждому Future, когда он завершится.Когда все фьючерсы завершены, мне нужно составить ответ.

Мой упрощенный код для создания списка фьючерсов и их обработчика:

    final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
    for (final MyObject1 obj1: listOfObject1) {
        final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);

        resp1Future .setHandler(new Handler<AsyncResult<MyResponse1>>() {

            @Override
            public void handle(final AsyncResult<MyResponse1> event) {
                final MyResponse1 resp1= event.result();
                handleResponse1(obj1, resp1);
            }

        });
        futureMyResponse1Lst.add(resp1Future );
    }

На этом этапе все обработчики вызываются в каждом будущемрезультат.

Но если я добавлю CompositeFuture, то будет вызвано только Handler из CompositeFuture:

    final List<Future> futuresAll = new ArrayList<>();
    futuresAll.addAll(futureMyResponse1Lst );
    CompositeFuture.all(futuresAll).setHandler(new Handler<AsyncResult<CompositeFuture>>() {

        @Override
        public void handle(final AsyncResult<CompositeFuture> event) {
            // called when all future are completed
            logger.debug("handle end of CompositeFuture and create response");
        }
    });

Все найденные вопросы и ответы предлагают использовать CompositeFuture, но вВ этом случае Future Single Handler не называется.

Как я могу вызвать обработчик для каждого Future результата и дополнительно один, когда все Future завершено?Я довольно новичок в программировании Async и Vert.x, я на правильном пути, используя CompositeFuture?Должен ли я использовать RxJava вместо этого?Как?

1 Ответ

0 голосов
/ 31 мая 2018

Вы можете создать дополнительное Future, которое будет выполнено в исходном обработчике Futures, и использовать это Future для CompositeFuture.

final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
    final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);

    Future<MyResponse1> resp1FutureComposite = Future.future();
    resp1Future.setHandler(event-> {
            final MyResponse1 resp1= event.result();
            handleResponse1(obj1, resp1);
            resp1FutureComposite.compelete(resp1);
    });
    futureMyResponse1Lst.add(resp1FutureComposite);
}

Если вы заботитесь только об успешном вызове и не хотите обрабатыватьколичество ошибок на будущее (а затем обрабатываете это в обработчике CompositeFuture), тогда вы можете использовать вместо этого метод compose:

final List<Future<MyResponse1>> futureMyResponse1Lst = new ArrayList<>();
for (final MyObject1 obj1: listOfObject1) {
    final Future<MyResponse1> resp1Future = myRestClient.getMyResponse1(obj1);

    Future<MyResponse1> resp1FutureComposite = resp1Future.compose(resp1 -> {
            handleResponse1(obj1, resp1);
            return Future.succeededFuture(resp1);
    });
    futureMyResponse1Lst.add(resp1FutureComposite);
}
...