Не удалось получить результат завершаемого будущего в объекте ответа - PullRequest
0 голосов
/ 08 октября 2019

Я звоню четырем API, используя интерфейс CompletableFuture из java.util.concurrent с Java8. Я хочу сделать несколько вызовов покоя, объединить результаты и вернуть JSON.

Я пробовал с Future, это работало на меня. Тогда я тоже хотел попробовать CompletableFuture.

public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {

    Map<String, Map<String, Object>> response = new HashMap<>();
    ExecutorService executor = Executors.newFixedThreadPool(6);
    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("mani");
        return aClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("gani");
        return bClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("priya");
        return cClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("ravi");
        return dClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    return response;
}

private void putIntoResponse(Map<String, Map<String, Object>> response, List<Map<String, Object>> s) {
    if(s.size() > 0) {
        for (Map<String, Object> maps : s) {
            if (maps != null && maps.containsKey("abcd")) {
                String abcd = maps.get("abcd").toString();
                if(!response.containsKey(abcd))
                    response.put(maps.get("abcd").toString(), maps);
                else {
                    for (Map.Entry<String, Object> entry: maps.entrySet()) {
                        response.get(abcd).put(entry.getKey(), entry.getValue());
                    }
                }
            }
        }
    }
}

Я звоню четырем Аписам. Я получаю список Hashmap в качестве ответа для каждого API. Теперь результат каждого API должен быть объединен в один ответ, а именно, Карты карт. Итак, я хочу, чтобы, получив ответ от API, я поместил этот результат в хэш-карту.

Но я получаю пустой ответ. Здесь завершаемое будущее вызывает API, но сервер не ждет ответа на вызов и возвращает. Как дать серверу подождать? Пожалуйста, предложите метод, чтобы я мог выполнить этот вариант использования с completetableFuture. Кроме того, предложите какой-нибудь более чистый способ сделать это.

Ответ, получаемый от API:

{ 
   {
    "abcd": 1,
    "cde": 2
   },
   { 
    "abcd": 2,
     "cde": 3
   }
}

Изучите приведенный выше ответ на:

{
   "1" : {
    "abcd": 1,
    "cde": 2
   },
   "2":{ 
    "abcd": 2,
     "cde": 3
   }

}

1 Ответ

0 голосов
/ 08 октября 2019

Я думаю, что ваша проблема заключается в том, что CompletableFuture.supplyAsync() не заблокирован, поэтому ваш код мгновенно движется вперед, вместо того, чтобы ждать применения асинхронной операции.

Код выполняется васинхронным образом, что в данном случае означает, что вы указываете операции внутри CompletableFuture.supplyAsync(), которые должны быть выполнены, и двигаетесь дальше. Часть thenApply() вызывается только тогда, когда код внутри supplyAsync завершает выполнение, что наиболее вероятно происходит после того, как вы уже вернули response.

Если вы хотите дождаться, пока все CompletableFuturesзавершите выполнение перед возвратом response, затем вам следует использовать метод CompletableFuture.join().

Сначала попытайтесь выполнить рефакторинг вашего кода, чтобы избавиться от части thenApply(), чтобы ваш результат ~ CompletableFutures в результате частичные ответы .

Затем присвойте все CompletableFutures некоторым переменным (myFirstFuture, mySecondFuture и т.д. в моем примере).

После этого используйте Stream примените метод join() ко всем CompletableFutures и примените метод putInResponse для каждого из результатов.

Например:

public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {

    Map<String, Map<String, Object>> response = new HashMap<>();
    ExecutorService executor = Executors.newFixedThreadPool(6);
    CompletableFuture<List<Map<String, Object>>> myFuture1 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("mani");
        return aClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture2 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("gani");
        return bClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture3 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("priya");
        return cClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture4 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("ravi");
        return dClient.transform(keys, requestObj);
    }, executor);

    Stream.of(myFuture1, myFuture2, myFuture3, myFuture4)
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .forEachOrdered(s -> putIntoResponse(response, s));


    return response;
}
...