ExecutorService, как ждать завершения всех задач - PullRequest
176 голосов
/ 17 июля 2010

Какой самый простой способ дождаться завершения всех заданий ExecutorService? Моя задача в основном вычислительная, поэтому я просто хочу запустить большое количество заданий - по одному на каждое ядро. Прямо сейчас моя установка выглядит так:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask реализует работоспособность. Это кажется для правильного выполнения задач, но код падает на wait() с IllegalMonitorStateException. Это странно, потому что я поиграл с некоторыми игрушечными примерами, и это сработало.

uniquePhrases содержит несколько десятков тысяч элементов. Должен ли я использовать другой метод? Я ищу что-то максимально простое

Ответы [ 14 ]

199 голосов
/ 17 июля 2010

Самый простой подход - использовать ExecutorService.invokeAll(), который делает то, что вы хотите, в одной строке. На вашем языке вам нужно изменить или обернуть ComputeDTask, чтобы реализовать Callable<>, что может дать вам немного больше гибкости. Возможно, в вашем приложении есть значимая реализация Callable.call(), но вот способ обернуть его, если не использовать Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

Как уже отмечали другие, вы можете использовать версию invokeAll() для тайм-аута, если это необходимо. В этом примере answers будет содержать набор Future s, которые будут возвращать нули (см. Определение Executors.callable(). Вероятно, вы хотите сделать небольшой рефакторинг, чтобы вы могли получить полезный ответ, или ссылка на базовый ComputeDTask, но я не могу сказать из вашего примера.

Если неясно, обратите внимание, что invokeAll() не вернется, пока не будут выполнены все задачи. (т. е. все Future в вашей коллекции answers сообщат .isDone(), если их спросят.) Это позволяет избежать всего ручного выключения, awaitTermination и т. д. и позволяет вам аккуратно использовать этот ExecutorService для нескольких циклов при желании.

Есть несколько связанных вопросов по SO:

Ни один из этих вопросов не является строго актуальным для вашего вопроса, но они дают немного информации о том, как люди думают, что Executor / ExecutorService следует использовать.

55 голосов
/ 17 июля 2010

Если вы хотите дождаться завершения всех задач, используйте метод shutdown вместо wait.Затем следуйте за этим с awaitTermination.

Кроме того, вы можете использовать Runtime.availableProcessors, чтобы получить количество аппаратных потоков, чтобы вы могли правильно инициализировать пул потоков.

48 голосов
/ 17 июля 2010

Если ожидание завершения всех задач в ExecutorService - это не совсем ваша цель, а скорее ожидание, пока определенный пакет задач не будет завершен, вы можете использовать CompletionService & mdash; в частности, ExecutorCompletionService.

Идея состоит в том, чтобы создать ExecutorCompletionService, обертывающий вашу Executor, отправку некоторого известного числа задач через CompletionService, затем нарисовать то же число результатов из очереди завершения с использованием либо take() (который блокирует), либо poll() (который не делает). Как только вы нарисовали все ожидаемые результаты, соответствующие заданным вами задачам, вы знаете, что все они выполнены.

Позвольте мне заявить об этом еще раз, потому что это не очевидно из интерфейса: вы должны знать, сколько вещей вы положили в CompletionService, чтобы узнать, сколько вещей попытаться вытянуть. Это особенно важно для метода take(): вызывайте его один раз слишком много, и он заблокирует ваш вызывающий поток, пока какой-либо другой поток не отправит другое задание тому же CompletionService.

Есть несколько примеров, показывающих, как использовать CompletionService в книге Параллелизм Java на практике .

11 голосов
/ 17 июля 2010

Если вы хотите дождаться завершения работы службы исполнителя, вызовите shutdown(), а затем awaitTermination (units, unitType) , например awaitTermination(1, MINUTE).ExecutorService не блокирует на своем собственном мониторе, поэтому вы не можете использовать wait и т. Д.

7 голосов
/ 17 июля 2010

Вы можете подождать, пока задания завершатся через определенный интервал:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Или вы можете использовать ExecutorService . submit ( Runnable ) и собирать Future объекты, которые он возвращает, и вызывать get () для каждого по очереди, чтобы дождаться их завершения.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException крайне важно правильно обращаться.Это то, что позволяет вам или пользователям вашей библиотеки безопасно завершить долгий процесс.

6 голосов
/ 22 января 2015

Просто используйте

latch = new CountDownLatch(noThreads)

В каждой теме

latch.countDown();

и как барьер

latch.await();
5 голосов
/ 18 апреля 2016

Основная причина для IllegalMonitorStateException :

Брошенный, чтобы указать, что поток попытался ждать на мониторе объекта или уведомить другие потоки, ожидающие на мониторе объекта, не имея указанного монитора.

Из вашего кода вы только что вызвали wait () на ExecutorService, не имея блокировки.

Ниже приведен код исправления IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

Следуйте одному из следующих подходов, чтобы дождаться завершения всех задач, которые были отправлены на ExecutorService.

  1. Выполните итерацию по всем Future задачам с submit по ExecutorService и проверьте состояние с помощью блокировки вызова get() по Future объекту

  2. Использование invokeAll on ExecutorService

  3. Использование CountDownLatch

  4. Использование ForkJoinPool или newWorkStealingPool из Executors (начиная с Java 8)

  5. Выключите пул, как рекомендовано в документации оракула страница

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    Если вы хотите изящно ждать завершения всех задач, когда вы используете опцию 5 вместо опций с 1 по 4, измените

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    до

    a while(condition), который проверяет каждую 1 минуту.

4 голосов
/ 10 апреля 2018

Вы можете использовать метод ExecutorService.invokeAll, он выполнит все задачи и будет ждать, пока все потоки не завершат свою задачу.

Здесь завершено Javadoc

Вы также можете использовать перегруженную версию этого метода для указания времени ожидания.

Вот пример кода с ExecutorService.invokeAll

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}
3 голосов
/ 14 октября 2010

У меня также есть ситуация, когда у меня есть набор документов для сканирования. Я начну с исходного «начального» документа, который должен быть обработан, этот документ содержит ссылки на другие документы, которые также должны быть обработаны, и т. Д.

В моей основной программе я просто хочу написать что-то вроде следующего, где Crawler контролирует кучу потоков.

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

Та же самая ситуация произошла бы, если бы я хотел перемещаться по дереву; я бы заглянул в корневой узел, процессор для каждого узла добавлял бы детей в очередь по мере необходимости, и группа потоков обрабатывала бы все узлы в дереве, пока их больше не было.

Я не смог найти ничего в JVM, что мне показалось немного удивительным. Поэтому я написал класс AutoStopThreadPool, который можно использовать напрямую или подклассом для добавления методов, подходящих для домена, например, schedule(Document). Надеюсь, это поможет!

AutoStopThreadPool Javadoc | Скачать

2 голосов
/ 06 июня 2014

Добавьте все темы в коллекцию и отправьте, используя invokeAll.Если вы можете использовать invokeAll метод ExecutorService, JVM не перейдет к следующей строке, пока все потоки не будут завершены.

Вот хороший пример: invokeAll via ExecutorService

...