Нет реальной проблемы, так как ваше предложение « Я не могу использовать CompletableFuture.allOf (), потому что остановка при первом сбое » неверно. Будущее, возвращаемое allOf
, будет завершено в исключительном порядке, если хотя бы один из входных фьючерсов был завершен в исключительном порядке, но он все равно будет завершен только после того, как все фьючерсы будут завершены.
Как можно легко продемонстрировать:
CompletableFuture<?> f1 = new CompletableFuture<>();
f1.completeExceptionally(new Throwable("fail immediately"));
CompletableFuture<?> f2
= CompletableFuture.runAsync(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)));
CompletableFuture<?> all = CompletableFuture.allOf(f1, f2);
long t0 = System.nanoTime();
try {
all.join();
} finally {
System.err.println("Completed: "+f1.isDone()+", "+f2.isDone());
System.err.printf("%.2fs%n", (System.nanoTime()-t0)*1e-9);
}
Completed: true, true
2,00s
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.Throwable: fail immediately
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1284)
at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1632)
at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1618)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.Throwable: fail immediately
at Demo.main(Demo.java:16)
Таким образом, вы можете использовать allOf
, чтобы проверить состояние завершения всех заданий, даже если некоторые из них не выполнены:
ExecutorService e = Executors.newFixedThreadPool(20);
Random r = ThreadLocalRandom.current();
CompletableFuture<?>[] workerJobs = IntStream.range(0, 20)
.mapToObj(i -> {
long time = TimeUnit.MILLISECONDS.toNanos(r.nextInt(4000));
boolean fail = r.nextBoolean();
return CompletableFuture.runAsync(() -> {
LockSupport.parkNanos(time);
if(fail) throw new RuntimeException();
}, e);
})
.toArray(CompletableFuture<?>[]::new);
e.shutdown();
try {
CompletableFuture.allOf(workerJobs).get(2, TimeUnit.SECONDS);
System.out.println("All completed within 2 seconds or less without failures");
}
catch(InterruptedException ex) {
throw new AssertionError(ex);
}
catch(ExecutionException ex) {
System.out.println("All completed within 2 seconds or less, at least one failed");
}
catch(TimeoutException ex) {
System.out.println("At least one did not complete within 2 seconds");
}
for(CompletableFuture<?> f: workerJobs) {
System.out.println(f.isDone()? "completed"
+(f.isCompletedExceptionally()? " exceptionally": ""): "not completed");
}