Как можно сделать асинхронные вызовы, используя для L oop в Vertx - PullRequest
0 голосов
/ 05 января 2020

Хотите выполнить запрос выборки для каждого запроса, а затем хотите добавить все в результаты, затем выполнить запрос обновления для каждого результата, и, в конце концов, просто хотите вернуть результаты в будущем ответе функции, но этого не происходит, потому что запрос выборки и обновления выполняется в отдельном событии l oop thread. Как я могу это сделать?

public Future<List<String>> method(List<Request> requests, SQLConnection connection) {
    List<String> results = new ArrayList<>();
    Future<List<String>> future = Future.future();
    try {
        Function<AsyncResult<ResultSet>, List<String>> buildFromResultSet = query -> {
            List<JsonObject> resulSetJson = query.result().getRows();
            return resulSetJson.stream().map(rs -> {
                return rs.getString("code");
            }).collect(Collectors.toList());
        };
        for (Request request : requests) {
            Future<List<String>> responseFuture = queryExecutor.queryWithParams(SELECT_ITEMS, buildFromResultSet, request.getQuantityChange());
            responseFuture.setHandler(handler -> {
                if (handler.succeeded()) {
                    results.addAll(handler.result());
                } else {
                    LOGGER.error("Error ", handler.cause());
                    future.fail(handler.cause());
                }
            });
            LOGGER.info("size :{}", results.size());
            connection.batchWithParams(UPDATE_STATUS, results.stream().map(result -> new JsonArray().add(result)).collect(Collectors.toList()), query -> {
                if (query.succeeded()) {
                    List<Integer> updated = query.result();
                    if (updated == null || updated.isEmpty() || updated.stream().anyMatch(u -> u == 0))
                        future.fail(new ValidationException(MessageDetail.builder().message("0 rows processed").messageType(MessageType.ERROR).build()));
                    else
                        future.complete(results);
                    LOGGER.info("Successfully executed : {}", results);
                } else {
                    future.fail(query.cause());
                }
            });
        }
    } catch (Exception e) {
        LOGGER.error("Error ", e);
        future.fail(e);
    }
    return future;
}
...