Как я могу убедиться, что пул потоков завершен? - PullRequest
2 голосов
/ 08 сентября 2011

Настройка:

Я нахожусь в процессе изменения способа работы программы под капотом. В текущей версии работает так:

public void threadWork( List<MyCallable> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst);
    List<Future<myOutput>> returnValues = new ArrayList<Future<myOutput>>();
    List<myOutput> finishedStuff = new ArrayList<myOutput>();

    for( int i = 0; i < workQueue.size(); i++ )
    {
        returnValues.add( pool.submit( workQueue.get(i) ) );
    }

    while( !returnValues.isEmpty() )
    {
        try
        {
            // Future.get() waits for a value from the callable
            finishedStuff.add( returnValues.remove(0).get(0) );
        }
        catch(Throwable iknowthisisbaditisjustanexample){}
    }

    doLotsOfThings(finsihedStuff);
}

Но новая система собирается использовать закрытый внутренний Runnable для вызова синхронизированного метода, который записывает данные в глобальную переменную. Моя основная настройка:

public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    List<MyRunnable> runnables = new ArrayList<MyRunnable>()

    for ( int i = 0; i < modules.size(); i++ )
    {
        runnables.add( new MyRunnable( workQueue.get(i) );
        pool.submit(threads.get(i));
    }

    while( !runnables.isEmpty() )
    {
        try 
        {
            runnables.remove(0).wait(); // I realized that this wouldn't work
        } 
        catch(Throwable iknowthisisbaditisjustanexample){}
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}

Если вы прочитаете мой комментарий во второй части кода, вы заметите, что я не знаю, как использовать wait (). Я думал, что это в основном похоже на thread.join (), но после прочтения документации я вижу, что это не так.

Я согласен с изменением некоторой структуры по мере необходимости, но базовая система работы, использование runnables, запись runnables в глобальную переменную и использование пула потоков являются требованиями.


Вопрос

Как я могу дождаться полного завершения пула потоков перед тем, как делать doLotsOfThings ()?

Ответы [ 5 ]

2 голосов
/ 08 сентября 2011

Вы должны вызвать ExecutorService.shutdown () , а затем ExecutorService.awaitTermination .

...
pool.shutdown();
if (pool.awaitTermination(<long>,<TimeUnit>)) {
    // finished before timeout
    doLotsOfThings(finsihedStuff);
} else {
    // Timeout occured.
}
2 голосов
/ 08 сентября 2011

Попробуйте это:

pool.shutdown();
pool.awaitTermination(WHATEVER_TIMEOUT, TimeUnit.SECONDS);
1 голос
/ 10 сентября 2011

shutdown является методом жизненного цикла ExecutorService и делает исполнителя непригодным для использования после вызова. Создание и уничтожение ThreadPools в методе так же плохо, как и создание / уничтожение потоков: это в значительной степени сводит на нет цель использования пула потоков, которая заключается в уменьшении накладных расходов на создание потоков, обеспечивая прозрачное повторное использование.

Если возможно, вы должны поддерживать жизненный цикл ExecutorService в синхронизации с вашим приложением. - создавать при первой необходимости, выключать при закрытии приложения.

Чтобы достичь цели по выполнению множества задач и их ожиданию, ExecutorService предоставляет метод invokeAll(Collection<? extends Callable<T>> tasks) (и версию с таймаутом, если вы хотите подождать определенный период времени.)

Используя этот метод и некоторые из упомянутых выше пунктов, рассматриваемый код становится:

public void threadReports( List<String> workQueue ) {

    List<MyRunnable> runnables = new ArrayList<MyRunnable>(workQueue.size());
    for (String work:workQueue) {
        runnables.add(new MyRunnable(work));
    }
    // Executor is obtained from some applicationContext that takes care of lifecycle mgnt
    // invokeAll(...) will block and return when all callables are executed 
    List<Future<MyRunnable>> results = applicationContext.getExecutor().invokeAll(runnables); 

    // I wouldn't use a global variable unless you have a VERY GOOD reason for that.  
    // b/c all the threads of the pool doing work will be contending for the lock on that variable.
    // doLotsOfThings(finishedStuff);  

    // Note that the List of Futures holds the individual results of each execution. 
    // That said, the preferred way to harvest your results would be:
    doLotsOfThings(results);
}

PS: Не уверен, почему threadReports равен void. Он может / должен вернуть расчет doLotsOfThings для достижения более функционального дизайна.

1 голос
/ 08 сентября 2011

Рассматривали ли вы возможность использования Fork / Join Framework, который теперь включен в Java 7. Если вы не хотите использовать Java 7, вы можете получить jar для него здесь ,

1 голос
/ 08 сентября 2011
public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    Set<Future<?>> futures = new HashSet<Future<?>>();

    for ( int i = 0; i < modules.size(); i++ )
    {
        futures.add(pool.submit(threads.get(i)));
    }

    while( !futures.isEmpty() )
    {
        Set<Future<?>> removed = new Set<Future<?>>();
        for(Future<?> f : futures) {
            f.get(100, TimeUnit.MILLISECONDS);
            if(f.isDone()) removed.add(f);
        }
        for(Future<?> f : removed) futures.remove(f);
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}
...