Последовательная композиция для произвольного количества звонков в Vertx с фьючерсами - PullRequest
0 голосов
/ 08 ноября 2018

Мы используем Futures в vertx в примерах вроде:

Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

        fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
                asyncResult -> {
                    if (asyncResult.succeeded()) {
                    LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
                    handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
                }
                else {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
        });

где мы обрабатываем 2 вызова, например.

ИЛИ У меня есть другой фрагмент, где я могу обработать любое количество методов:

List<Future> futures = new ArrayList<>();
        conversation.getRequestList().forEach(req -> {
            Future<Message<Object>> senderFuture = Future.future();
            vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());

            // sent successfully. save the replyAddress and the conversation for later/callback
            log.info("Saving the conversation for the request.", conversation.getReplyAddress());
            pendingCommands.put(req.getBody().getString(MSG_ID), conversation);

            futures.add(senderFuture);
        });

        CompositeFuture.all(futures).setHandler(ar -> {
            if (ar.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                log.error("forwardToVWClient VW got result : {}", ar.cause());
                handler.handle(Future.failedFuture(ar.cause()));
            }
        });

Здесь мы объединяем все запросы в conversation.getRequestList(), не зная заранее их количества.

Но недостатком метода .all() является то, что у нас нет контроля над заказом.

Как я могу связать любое количество методов с Vertx Futures (не зная точного количества вызовов)?

РЕДАКТИРОВАТЬ:

Официальное руководство говорит о последовательной композиции, но в приведенном примере есть 3 вызова. Это не объясняет, как это сделать для произвольного количества звонков.

См. «Последовательная композиция» в http://vertx.io/docs/vertx-core/java/

Надеюсь, это понятно.

Ответы [ 3 ]

0 голосов
/ 08 ноября 2018

Вот решение, использующее map & reduce, которое выполняет метод упорядоченным образом и возвращает накопленный результат в виде Future<String>

 public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
        return list.stream().reduce(Future.succeededFuture(),// the initial "future"
                (acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
                (a,b) -> Future.future()); // not used! only useful for parallel stream.
    }

можно использовать как в примере ниже:

 chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);

, где sendApiRequestViaBus:

/**
     * @param request The request to process
     * @return The result of the request processing. 
     */
    Future<String> sendApiRequestViaBus(ApiRequest request) {
        Future<String> future = Future.future();
        String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
        log.debug("Chain call start msgId {}", request.getId());

        vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
            log.debug("Chain call returns {}", request.getId());
            if (res.succeeded()) {
                future.complete("OK");
            } else {
                future.fail("KO");
            }
        });
        return future;
    }

Надеюсь, это поможет.

0 голосов
/ 10 ноября 2018

Вот кое-что удобное. Надеюсь, поможет.

public static <R> Future<List<R>> allOfFutures(List<Future<R>> futures) {
    return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()]))
            .map(v -> futures.stream()
                    .map(Future::result)
                    .collect(Collectors.toList())
            );
}
0 голосов
/ 08 ноября 2018

Если вы хотите передать ответ от предыдущего запроса к следующему запросу и предположить, что у вас есть разные обработчики для каждого ответа. Вы можете добавить вспомогательный метод

private <T> Future<T> chain(Future<T> init, List<Function<T, Future<T>>> handlers) {
    Future<T> result = init;
    for (Function<T, Future<T>> handler : handlers) {
        result = result.compose(handler);
    }
    return result;
}

А потом измени свой код следующим образом

    Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

    Function<JsonObject, Future<JsonObject>> vehicleResponseHandler = vehicleJson ->
        vehicleDoor(routingContext, client, vehicleJson, lock);

    Function<JsonObject, Future<JsonObject>> anotherTrivialHandler = someJsonObj -> {
        // add here new request by using information from someJsonObj
        LOG.info("Hello from trivial handler {} ", someJsonObj);
        return Future.succeededFuture(someJsonObj);
    };

    List<Function<JsonObject, Future<JsonObject>>> handlers = new ArrayList<>();

    handlers.add(vehicleResponseHandler);
    handlers.add(anotherTrivialHandler);

    chain(fetchVehicle, handlers).setHandler( asyncResult -> {
        if (asyncResult.succeeded()) {
            handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
        } else {
            handler.handle(Future.failedFuture(asyncResult.cause()));
        }
    });

Но есть ограничение для этой реализации, которое требует, чтобы каждая цепочка Future имела одинаковый параметр типа T.

...