получить метод на будущее CompletableFutures - PullRequest
1 голос
/ 06 ноября 2019

Я выполняю многопоточную работу в Java, и у меня есть вопрос о методе Future get.

У меня есть ExecutorSevice, который отправляет n Callable. Подтверждение возвращает Future, который добавляется в список (позже - некоторый код). Метод Callable call проходит по списку, и для каждого элемента списка вызывается someMethod и возвращает CompletableFuture<T>. Затем Завершаемое будущее добавляется на карту.

В конце цикла call возвращает карту, содержащую группу CompletableFuture.

Итак, начальный список (uglyList) содержит n карт <String, CompletableFuture<String>>.

Мой вопрос: когда я вызываю get для элемента uglyList, выполнение блокируется до тех пор, пока не будет просмотрен весь список (переданный в качестве параметра для вызываемого объекта) и все CompletableFutures не будут вставлены вкарта, верно?

Потому что, у меня есть сомнения, возможно ли, что get в этом примере также ожидает завершения CompletableFutures на карте? Я не знаю, как это проверить


public class A {

    private class CallableC implements Callable<Map<String, CompletableFuture<String>>> {
        private List<String> listString;

        Map<String, CompletableFuture<String>> futureMap;

        public CallableC(List<String> listString) throws Exception {        
            this.listString = listString;
            this.futureMap = new HashMap<>();
        }

        @Override
        public Map<String, CompletableFuture<String>> call() throws Exception {

            for (int j = 0; j < listString.size(); ++j) {
                CompletableFuture<String> future = someMethod();        
                futureMap.put(listString.get(i), future);
            }

            return futureMap;
        }
    }


    public void multiThreadMethod(List<String> items) throws Exception {

        List<Future<Map<String, CompletableFuture<String>>>> uglyList = new ArrayList<>();

        int coresNumber = (Runtime.getRuntime().availableProcessors());

                // This method split a List in n subList
        List<List<String>> splittedList = split(items, coresNumber);

        ExecutorService executor = Executors.newFixedThreadPool(coresNumber);

        try {

            for (int i = 0; i < coresNumber; ++i) {
                threads.add(executor.submit(new CallableC(splittedList.get(i),)));
            }

            for (int i = 0; i < coresNumber; ++i) {
                uglyList.get(i).get(); //here
            }


        } catch (Exception e) {
                     // YAY errors
        }
        finally {
            executor.shutdown();
        }
    }
}

1 Ответ

0 голосов
/ 06 ноября 2019

когда я вызываю get для элемента uglyList, выполнение блокируется до тех пор, пока не будет просмотрен весь список (переданный в качестве параметра для вызываемого объекта) и все CompletableFutures будут вставлены в карту, верно?

Да, это правильно. get() будет ожидать завершения Callable, что в вашем случае означает конец метода CallableC#call(), который будет достигнут в конце цикла for, когда все CompletableFutures созданы ивставлен в futureMap, независимо от состояния / завершения этих CompletableFuture с.

возможно, что get в этом примере также ожидает завершения CompletableFutures на карте?

Этого можно достичь, но не без дополнительного кода.

Минимальный способ сделать это в вашем коде - добавить вызов соединения внутри CompletableFuture s. Задачи.

@Override
    public Map<String, CompletableFuture<String>> call() throws Exception {

        for (int j = 0; j < listString.size(); ++j) {
            CompletableFuture<String> future = someMethod();        
            futureMap.put(listString.get(j), future);
        }
        // This will wait for all the futures inside the map to terminate
        try {
            CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
        } catch (CompletionException e) {
            // ignored - the status will be checked later on
        }

        return futureMap;
    }

Другим способом было бы подождать в самом конце (я пропускаю обработку исключений для краткости):

for (int i = 0; i < coresNumber; ++i) {
    threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
// This will wait for all the futures to terminate (thanks to f.get())
// And then, all the completable futures will be added to an array of futures that will also be waited upon
CompletableFuture.allOf(
    uglyList.stream().flatMap(f -> f.get().values().stream()).toArray(CompletableFuture[]::new)
).join();

Но если это возможно для вас, я бы упростилвесь код, используя только завершаемые фьючерсы, если только у вас нет веских оснований ломать задачи так, как вы делаете (используя список, выделенного исполнителя и т. д.). Это может выглядеть так (возможно, несколько опечаток)

List<String> items = ...
ConcurrentMap<String, String> resultsByItem = new ConcurrentHashMap<>();

Future<Void> allWork = CompletableFuture.allOf(
    items.stream()
        .map(item -> someWork(item) // Get a completable future for this item
                     .andThen(result -> resultsByItem.put(item, result)) // Append the result to the map
        ).toArray(CompletableFuture[]::new)
);
allWork.join();
// Your futureMap is completed.

Это заменит ваш весь код в вопросе.

...