Нет необходимости в CompletableFuture
, так как способ использования ExecutorService
достаточен, однако, есть некоторые аспекты потока кода, которые можно улучшить. Вы выбираете первый элемент, даже когда он не нужен, и приводите его к CallableService
без причины, так как вы уже можете вызывать метод через интерфейс Callable
. В другой ветке вы перехватываете InterruptedException
и продолжаете работу, поэтому вызывающая сторона никогда не узнает, что не все задания были выполнены. И в прямом потоке кода вам не нужно проверять список на null
:
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
Callable<Boolean> singleService = threadList.get(0);
singleService.call();
}
else {
List<Future<Boolean>> futureList = myThreadPoolExecutor.invokeAll(threadList);
for(Future<Boolean> future : futureList) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
Вы можете сократить его до
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
for(Future<Boolean> future : myThreadPoolExecutor.invokeAll(threadList)) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
Но это вопрос предпочтительного стиля кодирования. Но обратите внимание, что мне показалось, что в случае с одним элементом вы не выполняете одинаковую обработку исключений.
Чтобы использовать CompletableFuture
, нам нужен метод адаптера, как метод удобства supplyAsync
требует Supplier
вместо Callable
. Используя модифицированный вариант этого ответа , мы получаем
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
CompletableFuture<?> all = CompletableFuture.allOf(
threadList.stream()
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new));
try {
all.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
Таким образом, у нас нет invokeAll
, который заботится о представлении всех заданий. Мы должны сделать это вручную, либо с помощью oop, либо с помощью потоковой операции. С другой стороны, мы получаем единственное будущее через allOf
, представляющее статус завершения, в исключительном случае, если по крайней мере одно задание не выполнено.
В отличие от invokeAll
, ожидающего завершения, allOf
возвращает только будущее, так что это all.get()
вызов, который ожидает завершения. Мы могли бы сделать что-то еще до этого или даже использовать это свойство, чтобы всегда выполнять первую работу в потоке вызывающего:
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
CompletableFuture<?> tail = CompletableFuture.allOf(
threadList.stream().skip(1)
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new)),
head = callAsync(threadList.get(0), Runnable::run);
try {
head.get();
tail.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
Это всегда будет вызывать первый вызываемый объект в текущем потоке, так как Runnable::run
используется как Executor
немедленно выполнит действие в вызывающем потоке. Но это рассматривается одинаково во всех других аспектах, особенно в обработке исключений. Когда есть только одно задание, allOf
вызов с пустым массивом ничего не сделает и вернет уже завершенное будущее, что даст желаемый эффект.