Как ждать завершения всех потоков, используя ExecutorService? - PullRequest
343 голосов
/ 09 августа 2009

Мне нужно выполнить некоторое количество задач 4 одновременно, что-то вроде этого:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

Как я могу получить уведомление, когда все они будут завершены? Пока я не могу думать о чем-то лучше, чем установить какой-либо глобальный счетчик задач и уменьшить его в конце каждой задачи, а затем отслеживать в бесконечном цикле этот счетчик, чтобы он стал 0; или получить список фьючерсов и в бесконечном цикле монитора isDone для всех из них. Каковы лучшие решения без бесконечных циклов?

Спасибо.

Ответы [ 24 ]

412 голосов
/ 09 августа 2009

В основном на ExecutorService вы звоните shutdown(), а затем awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
164 голосов
/ 09 августа 2009

Использовать CountDownLatch :

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

и в пределах вашей задачи (приложите в try / finally)

latch.countDown();
80 голосов
/ 09 августа 2009

ExecutorService.invokeAll() делает это за вас.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
43 голосов
/ 02 ноября 2012

Вы также можете использовать списки фьючерсов:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

затем, когда вы хотите присоединиться ко всем из них, это, по сути, эквивалентно присоединению к каждому (с дополнительным преимуществом в том, что оно возбуждает исключения из дочерних потоков в основную):

for(Future f: this.futures) { f.get(); }

По сути, уловка заключается в том, чтобы вызывать .get () для каждого Future по одному, вместо бесконечного зацикливания, вызывая isDone () on (все или каждый). Таким образом, вы гарантированно «продвинетесь» через и через этот блок, как только закончится последний поток. Предостережение заключается в том, что, поскольку вызов .get () повторно вызывает исключения, если один из потоков умирает, вы могли бы подняться из этого, возможно, до того, как другие потоки закончат работу до завершения [чтобы избежать этого, вы можете добавить catch ExecutionException вокруг получить вызов]. Другое предостережение заключается в том, что он сохраняет ссылку на все потоки, поэтому, если у них есть локальные переменные потока, они не будут собираться до тех пор, пока вы не пройдете этот блок (хотя, возможно, вы сможете обойти это, если это станет проблемой, удалив Будущее вне ArrayList). Если вы хотите знать, какое будущее «заканчивается первым», вы можете использовать что-то вроде https://stackoverflow.com/a/31885029/32453

28 голосов
/ 29 апреля 2016

В Java8 вы можете сделать это с CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
24 голосов
/ 19 января 2012

Только мои два цента. Чтобы преодолеть требование CountDownLatch заранее знать количество задач, вы можете сделать это по старинке, используя простой Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

В вашей задаче просто позвоните s.release(), как если бы вы latch.countDown();

12 голосов
/ 07 ноября 2014

Немного опоздал к игре, но ради завершения ...

Вместо того, чтобы «ждать» выполнения всех заданий, вы можете думать с точки зрения голливудского принципа «не звони мне, я позвоню тебе» - когда я закончу. Я думаю, что полученный код более элегантный ...

Гуава предлагает несколько интересных инструментов для достижения этой цели.

Пример ::

Обернуть ExecutorService в ListeningExecutorService ::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Представить коллекцию вызываемых для исполнения ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Теперь важная часть:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Прикрепите функцию обратного вызова к ListenableFuture, которую вы можете использовать, чтобы получать уведомления о завершении всего будущего ::

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

Это также дает то преимущество, что вы можете собрать все результаты в одном месте после завершения обработки ...

Подробнее здесь

12 голосов
/ 09 августа 2009

Класс CyclicBarrier в Java 5 и более поздних версиях предназначен для такого рода вещей.

7 голосов
/ 19 апреля 2016

Следуйте одному из следующих подходов.

  1. Выполните итерацию по всем будущим задачам, возвращенным из submit в ExecutorService, и проверьте состояние с помощью блокирующего вызова get() в Future объекте, как предложено Kiran
  2. Использование invokeAll() на ExecutorService
  3. CountDownLatch
  4. ForkJoinPool или Executors.html # newWorkStealingPool
  5. Использовать shutdown, awaitTermination, shutdownNow API ThreadPoolExecutor в правильной последовательности

Связанные вопросы SE:

Как CountDownLatch используется в многопоточности Java?

Как правильно отключить Java ExecutorService

5 голосов
/ 02 сентября 2018

здесь два варианта, просто запутайте, какой из них лучше выбрать.

Вариант 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Вариант 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

Здесь выкладываю future.get (); в try catch это хорошая идея, верно?

...