Вызов списка <CompletableFuture>с таймаутом и сбором результатов - PullRequest
1 голос
/ 24 марта 2020

Я пытаюсь остановить множество экземпляров Worker с таймаутом в 2 секунды для каждого работника, и я ожидаю, что некоторые из них потерпят неудачу. Я хочу добиться чего-то вроде ниже, и я знаю, что это плохой подход. Я не могу использовать CompletableFuture.allOf (), потому что он останавливается при первом сбое. Также я никогда прежде не использовал CompletableFutures. Я попытался с Executors.newFixedThreadPool (3) .invokeAll () без успеха.

public interface Worker {

public String workerId();
public CompletableFuture<Worker> stop();
}

List<Worker> workers;

public stopAll() {

   workers.stream()
   .parallel()
   .map(worker -> 
       try {
           worker.stop().get(2, TimeUnit.SECONDS)
       } catch(InterruptedException | java.util.concurrent.ExecutionException | TimeoutException e){
           log.error("Worker {} failed to stop", worker.workerId())
       }
}

Моя цель:

  1. Метод stopAll () занимает около 2 секунд до завершения sh

  2. все рабочие, которые не остановились в это время, будут зарегистрированы по Id

Кто-нибудь получил какие-либо предложения? Спасибо за любую помощь.

Решение:

Я был неправ относительно CompletableFuture.AllOf (). Поскольку я хочу получить доступ к workerId () после завершения всех операций, мне нужно сохранить их на карте.

Map<CompletableFuture<Worker>, Worker> cfWorkerMap = workers.stream()
                    .parallel()
                    .flatMap(Collection::stream)
                    .collect(toMap(
                            ExecEnvVerticle::stop,
                            Function.identity()
                    ))
try {
    CompletableFuture.allOf(
        cfWorkerMap.keySet().toArray(new CompletableFuture[0]))
            .get(2, TimeUnit.SECONDS);
} catch (Exception e) {
    cfExecEnvMap.entrySet().parallelStream()
        .peek(entry -> {
            if (!entry.getKey().isDone()) {
                entry.getKey().completeExceptionally(new TimeoutException())
        .forEach(entry -> entry.getKey()
            .handle((execEnv, throwable) -> {
                if (Objects.nonNull(throwable)) {
                    log.error(entry.getValue().workerId())
                }
            return null;
            }
        ))

Ответы [ 2 ]

2 голосов
/ 25 марта 2020

Нет реальной проблемы, так как ваше предложение « Я не могу использовать CompletableFuture.allOf (), потому что остановка при первом сбое » неверно. Будущее, возвращаемое allOf, будет завершено в исключительном порядке, если хотя бы один из входных фьючерсов был завершен в исключительном порядке, но он все равно будет завершен только после того, как все фьючерсы будут завершены.
Как можно легко продемонстрировать:

CompletableFuture<?> f1 = new CompletableFuture<>();
f1.completeExceptionally(new Throwable("fail immediately"));
CompletableFuture<?> f2
  = CompletableFuture.runAsync(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)));
CompletableFuture<?> all = CompletableFuture.allOf(f1, f2);

long t0 = System.nanoTime();
try {
    all.join();
} finally {
    System.err.println("Completed: "+f1.isDone()+", "+f2.isDone());
    System.err.printf("%.2fs%n", (System.nanoTime()-t0)*1e-9);
}
Completed: true, true
2,00s
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.Throwable: fail immediately
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1284)
    at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1632)
    at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1618)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.Throwable: fail immediately
    at Demo.main(Demo.java:16)

Таким образом, вы можете использовать allOf, чтобы проверить состояние завершения всех заданий, даже если некоторые из них не выполнены:

ExecutorService e = Executors.newFixedThreadPool(20);
Random r = ThreadLocalRandom.current();
CompletableFuture<?>[] workerJobs = IntStream.range(0, 20)
    .mapToObj(i -> {
      long time = TimeUnit.MILLISECONDS.toNanos(r.nextInt(4000));
      boolean fail = r.nextBoolean();
      return CompletableFuture.runAsync(() -> {
        LockSupport.parkNanos(time);
        if(fail) throw new RuntimeException();
      }, e);
    })
    .toArray(CompletableFuture<?>[]::new);
e.shutdown();

try {
  CompletableFuture.allOf(workerJobs).get(2, TimeUnit.SECONDS);
  System.out.println("All completed within 2 seconds or less without failures");
}
catch(InterruptedException ex) {
  throw new AssertionError(ex);
}
catch(ExecutionException ex) {
  System.out.println("All completed within 2 seconds or less, at least one failed");
}
catch(TimeoutException ex) {
  System.out.println("At least one did not complete within 2 seconds");
}
for(CompletableFuture<?> f: workerJobs) {
  System.out.println(f.isDone()? "completed"
    +(f.isCompletedExceptionally()? " exceptionally": ""): "not completed");
}
0 голосов
/ 25 марта 2020

Java не может подождать кучу завершаемых фьючерсов с тайм-аутом (о котором я знаю):

public class TryWaitForCF {

private void run() throws Exception {

    List<Worker> workers = new ArrayList<>();
    for( int i=0; i<10; i++){
        workers.add(new Worker("Worker="+i));
    }

    List<CompletableFuture<Worker>> waitList = new ArrayList<>();
    workers.forEach(worker -> waitList.add(worker.stop()));

    long endTime = System.currentTimeMillis() + 2000;
    for( CompletableFuture<Worker> cf : waitList ){
        long timeout = endTime - System.currentTimeMillis();
        if( timeout < 0 ){
            timeout = 0;
        }
        System.out.println("Get result waiting at most: " + timeout + " ms");
        try {
            Worker result = cf.get(timeout, TimeUnit.MILLISECONDS);
            System.out.println(result.name  + ": finished" );
        }
        catch( Exception e ){
            System.out.println("Failed to get result: " + e.getMessage());
        }
    }

}

public static void main(String[] args) throws Exception {
    new TryWaitForCF().run();
}

static class Worker {
    private String name;
    private long timeToComplete;

    public Worker(String name){
        this.name = name;
        this.timeToComplete = (long) (Math.random() * (3000 - 1000 + 1) + 1000);
        System.out.println(name  + ": timeToComplete=" + timeToComplete);
    }

    public CompletableFuture<Worker> stop() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(timeToComplete);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return this;
        });
    }
}

}

результаты:

Worker=0: timeToComplete=1018
Worker=1: timeToComplete=1866
Worker=2: timeToComplete=1894
Worker=3: timeToComplete=2041
Worker=4: timeToComplete=1124
Worker=5: timeToComplete=1613
Worker=6: timeToComplete=2445
Worker=7: timeToComplete=2188
Worker=8: timeToComplete=2129
Worker=9: timeToComplete=2174
Get result waiting at most: 2000 ms
Worker=0: finished
Get result waiting at most: 980 ms
Worker=1: finished
Get result waiting at most: 130 ms
Worker=2: finished
Get result waiting at most: 100 ms
Failed to get result: null
Get result waiting at most: 0 ms
Worker=4: finished
Get result waiting at most: 0 ms
Worker=5: finished
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null
Get result waiting at most: 0 ms
Failed to get result: null

Возможно, вы захотите изменить свой дизайн.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...