Используйте CompletableFuture для выполнения одного или нескольких Callables и избежать блокировки - PullRequest
1 голос
/ 14 января 2020

Я выполняю несколько вызовов через ThreadPoolExecutor. Если список потоков содержит только 1 вызываемый объект, то я напрямую вызываю метод call моего CallableService. Если список содержит более 1 вызываемых элементов, тогда я выполняю все эти потоки параллельно через исполнителя пула потоков.

Как я могу добиться этого с Java 8 CompletableFuture? И если future.get() улучшено, чтобы избежать блокировки, это будет плюсом.

private static ThreadPoolExecutor myThreadPoolExecutor = new ThreadPoolExecutor(0, 100, 5L, TimeUnit.SECONDS, new SynchronousQueue<>());

public static void execute(List<Callable<Boolean>> threadList) throws Exception {

    List<Future<Boolean>> futureList = null;
    CallableService singleService = (CallableService) threadList.get(0);
    if (1 == threadList.size()) {
        singleService.call();
    }
    else {
        try {
            futureList = myThreadPoolExecutor.invokeAll(threadList);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    if (null != futureList) {
        for (Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch (Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
}

Ответы [ 3 ]

4 голосов
/ 14 января 2020

Нет необходимости в CompletableFuture, так как способ использования ExecutorService достаточен, однако, есть некоторые аспекты потока кода, которые можно улучшить. Вы выбираете первый элемент, даже когда он не нужен, и приводите его к CallableService без причины, так как вы уже можете вызывать метод через интерфейс Callable. В другой ветке вы перехватываете InterruptedException и продолжаете работу, поэтому вызывающая сторона никогда не узнает, что не все задания были выполнены. И в прямом потоке кода вам не нужно проверять список на null:

public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    if(1 == threadList.size()) {
        Callable<Boolean> singleService = threadList.get(0);
        singleService.call();
    }
    else {
        List<Future<Boolean>> futureList = myThreadPoolExecutor.invokeAll(threadList);
        for(Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch(Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
}

Вы можете сократить его до

public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    if(1 == threadList.size()) {
        threadList.get(0).call();
    }
    else {
        for(Future<Boolean> future : myThreadPoolExecutor.invokeAll(threadList)) {
            try {
                future.get();
            }
            catch(Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
}

Но это вопрос предпочтительного стиля кодирования. Но обратите внимание, что мне показалось, что в случае с одним элементом вы не выполняете одинаковую обработку исключений.


Чтобы использовать CompletableFuture, нам нужен метод адаптера, как метод удобства supplyAsync требует Supplier вместо Callable. Используя модифицированный вариант этого ответа , мы получаем

public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    if(1 == threadList.size()) {
        threadList.get(0).call();
    }
    else {
        CompletableFuture<?> all = CompletableFuture.allOf(
            threadList.stream()
                .map(c -> callAsync(c, myThreadPoolExecutor))
                .toArray(CompletableFuture<?>[]::new));
        try {
            all.get();
        }
        catch(Exception e) {
            //do some calculations here and then throw exception
            throw new Exception(e.getMessage(), e);
        }
    }
}
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
    CompletableFuture<R> cf = new CompletableFuture<>();
    CompletableFuture.runAsync(() -> {
        try { cf.complete(callable.call()); }
        catch(Throwable ex) { cf.completeExceptionally(ex); }
    }, e);
    return cf;
}

Таким образом, у нас нет invokeAll, который заботится о представлении всех заданий. Мы должны сделать это вручную, либо с помощью oop, либо с помощью потоковой операции. С другой стороны, мы получаем единственное будущее через allOf, представляющее статус завершения, в исключительном случае, если по крайней мере одно задание не выполнено.

В отличие от invokeAll, ожидающего завершения, allOf возвращает только будущее, так что это all.get() вызов, который ожидает завершения. Мы могли бы сделать что-то еще до этого или даже использовать это свойство, чтобы всегда выполнять первую работу в потоке вызывающего:

public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    CompletableFuture<?> tail = CompletableFuture.allOf(
        threadList.stream().skip(1)
            .map(c -> callAsync(c, myThreadPoolExecutor))
            .toArray(CompletableFuture<?>[]::new)),
        head = callAsync(threadList.get(0), Runnable::run);
    try {
        head.get();
        tail.get();
    }
    catch(Exception e) {
        //do some calculations here and then throw exception
        throw new Exception(e.getMessage(), e);
    }
}

Это всегда будет вызывать первый вызываемый объект в текущем потоке, так как Runnable::run используется как Executor немедленно выполнит действие в вызывающем потоке. Но это рассматривается одинаково во всех других аспектах, особенно в обработке исключений. Когда есть только одно задание, allOf вызов с пустым массивом ничего не сделает и вернет уже завершенное будущее, что даст желаемый эффект.

0 голосов
/ 14 января 2020

Я выполняю несколько вызовов через ThreadPoolExecutor. Если список потоков содержит только 1 вызываемый объект, то я напрямую вызываю метод вызова моего CallableService. Если список содержит более 1 вызываемых, то я выполняю все эти потоки параллельно через исполнителя пула потоков.

Я полагаю, вы уже реализовали эту часть. (Вы можете столкнуться с проблемами использования памяти, если ваши задания тяжелые, и у вас 100 запущенных потоков, как настроено. Но это другая проблема.)

И если в будущем. блокировка, это будет плюсом.

Для этого вы можете воспользоваться следующим подходом:

  1. Создать еще один ExecutorService, задачей которого будет просто запустить Future.get() звонки.
  2. Отправьте Future.get() в эту службу, как показано ниже.
  3. Завершите работу и дождитесь завершения.

    if (null != futureList) {
        ExecutorService waitSvc = Executors.newCachedThreadPool();
        for (Future<Boolean> future : futureList) {
            try {
                waitSvc.submit( () -> future.get() );
            }
            catch (Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
        waitSvc.shutdown(); //This may take some time. You may want to call awaitTermination() after this.
    }
    

Однако я считаю, что вам следует изменить общий подход к использованию такого количества потоков, если только это не приложение для обучения.

0 голосов
/ 14 января 2020

Future.isDone () сообщает нам, завершил ли исполнитель обработку задачи. Если задача завершена, в противном случае она вернет true, но вернет false.

 for (Future<Boolean> future : futureList) {
   while(!future.isDone()) 
   {
          doSOmethingElse();
          Thread.sleep(300);//Optional
    }
	try {
                future.get();
        }
    catch (Exception e) 
	{
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
    }
}

Но нам не нужно об этом беспокоиться, поскольку мы добираемся до точки, в которой вызывается get () после проверки того, что задача выполнена.

...