Как дождаться завершения всех задач в ThreadPoolExecutor, не выключая Executor? - PullRequest
58 голосов
/ 14 октября 2010

Я не могу использовать shutdown() и awaitTermination(), поскольку возможно, что новые задачи будут добавлены в ThreadPoolExecutor, пока он ожидает.

Поэтому я ищу способ подождать, пока ThreadPoolExecutor опустошит свою очередь и завершит все свои задачи, не останавливая добавление новых задач до этого момента.

Если это имеет какое-то значение, это для Android.

Спасибо

Обновление : много недель спустя после повторного рассмотрения этого я обнаружил, что модифицированный CountDownLatch лучше работает для меня в этом случае. Я буду помечать ответ, потому что он больше относится к тому, что я спросил.

Ответы [ 6 ]

67 голосов
/ 14 октября 2010

Если вам интересно знать, когда завершается определенная задача или определенная партия задач, вы можете использовать ExecutorService.submit(Runnable).Вызов этого метода возвращает объект Future, который может быть помещен в Collection, который ваш основной поток затем будет повторять, вызывая Future.get() для каждого.Это заставит ваш основной поток остановить выполнение до тех пор, пока ExecutorService не обработает все задачи Runnable.

Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
    future.get();
}
7 голосов
/ 23 февраля 2012

Мой сценарий - это сканер, который извлекает некоторую информацию с веб-сайта и затем обрабатывает ее. ThreadPoolExecutor используется для ускорения процесса, потому что много страниц могут быть загружены за время. Таким образом, в существующей задаче будут создаваться новые задачи, поскольку сканер будет следовать гиперссылкам на каждой странице. Проблема та же: основной поток не знает, когда все задачи завершены, и он может начать обрабатывать результат. Я использую простой способ определить это. Это не очень элегантно, но работает в моем случае:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
5 голосов
/ 14 октября 2010

Возможно, вы ищете CompletionService для управления пакетами задач, см. Также этот ответ .

3 голосов
/ 14 октября 2010

(Это попытка воспроизвести ранее удаленный ответ Тило с моими собственными изменениями.)

Я думаю, что вам, возможно, придется уточнить ваш вопрос, поскольку существует неявное бесконечное условие ... в какой-то момент вы должны решить закрыть своего исполнителя, и в этот момент он не примет больше никаких задач. Похоже, ваш вопрос подразумевает, что вы хотите подождать, пока вы не узнаете , что дальнейшие задачи не будут отправлены, что вы можете знать только в своем собственном коде приложения.

Следующий ответ позволит вам плавно перейти к новому TPE (по какой-либо причине), выполнить все представленные в настоящее время задачи и не отклонять новые задачи для нового TPE. Это может ответить на ваш вопрос. @ Тило тоже может.

Предполагая, что вы определили где-то видимый используемый TPE как таковой:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

Затем вы можете написать процедуру подкачки TPE как таковую. Это также может быть написано с использованием синхронизированного метода, но я думаю, что это проще:

void rotateTPE()
{
   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

В качестве альтернативы, если вы действительно не шутите, хотите уничтожить пул потоков, то вашей стороне-отправителю в какой-то момент потребуется справиться с отклоненными задачами, и вы можете использовать null для нового TPE:

void killTPE()
{
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Что может вызвать проблемы в восходящем направлении, вызывающий должен знать, что делать с null.

Вы также можете поменяться с фиктивным TPE, который просто отклоняет каждое новое выполнение, но это эквивалентно тому, что происходит, если вы вызываете shutdown() на TPE.

1 голос
/ 19 апреля 2016

Если вы не хотите использовать shutdown, выполните следующие подходы:

  1. Выполните итерацию по всем Future задачам с отправки на ExecutorService и проверьте статус с блокировкойвызовите get() для Future объекта, как предложено Tim Bender

  2. Используйте один из

    1. Использование invokeAll для ExecutorService
    2. Использование CountDownLatch
    3. Использование ForkJoinPool или newWorkStealingPool из Executors (начиная с Java 8)

invokeAll() на службе исполнителя также достигает той же цели, что и CountDownLatch

Связанный вопрос SE:

Как ждатьколичество нитей для завершения?

0 голосов
/ 29 апреля 2016

Вы можете вызвать waitTillDone () на Runner class:

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

Чтобы использовать его, добавьте эту зависимость gradle / maven в ваш проект: 'com.github.matejtymes:javafixes:1.0'

Для более подробной информации смотрите здесь: https://github.com/MatejTymes/JavaFixes или здесь: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

...