Дождитесь завершения всех задач (неизвестное число) - ThreadPoolExecutor - PullRequest
4 голосов
/ 30 марта 2012

Я столкнулся с этой проблемой параллелизма, которую я почесал в голове несколько дней.

По сути, я хочу, чтобы мой ThreadPoolExecutor ожидал завершения всех задач (количество неизвестных задач) перед тем, как завершить работу.

public class AutoShutdownThreadPoolExecutor extends ThreadPoolExecutor{
    private static final Logger logger = Logger.getLogger(AutoShutdownThreadPoolExecutor.class);
    private int executing = 0;
    private ReentrantLock lock = new ReentrantLock();
    private final Condition newTaskCondition = lock.newCondition(); 
    private final int WAIT_FOR_NEW_TASK = 120000;

    public AutoShutdownThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        lock.lock();
        executing++;
        lock.unlock();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        try{
            lock.lock();
            int count = executing--;
            if(count == 0) {
                newTaskCondition.await(WAIT_FOR_NEW_TASK, TimeUnit.MILLISECONDS); 
                if(count == 0){
                    this.shutdown();
                    logger.info("Shutting down Executor...");
                }
            }
        }
        catch (InterruptedException e) {
            logger.error("Sleeping task interrupted", e);
        }
        finally{
            lock.unlock();
        }
    }
}

Предполагается, что задача проверяет счетчик задач (выполняет), и, если он равен 0, она на некоторое время блокируется, а затем снимает блокировку, чтобы другие задачи могли иметь возможностьвыполнить и не завершать работу исполнителя слишком рано.

Однако этого не происходит.Все 4 потока в исполнителе переходят в состояние ожидания:

"pool-1-thread-4" prio=6 tid=0x034a1000 nid=0x2d0 waiting on condition [0x039cf000]
"pool-1-thread-3" prio=6 tid=0x034d0400 nid=0x1328 waiting on condition [0x0397f000]
"pool-1-thread-2" prio=6 tid=0x03493400 nid=0x15ec waiting on condition [0x0392f000]
"pool-1-thread-1" prio=6 tid=0x034c3800 nid=0x1fe4 waiting on condition [0x038df000]

Если я добавлю инструкцию log (предполагается, что поток будет замедлен) в классе Runnable, проблема, похоже, исчезнет.

public void run() {
    //  logger.info("Starting task" + taskName);
        try {
            //doTask();
        }
        catch (Exception e){
            logger.error("task " + taskName + " failed", e);
        }
}

Вопрос похож на этот пост Java ExecutorService: awaitTermination всех рекурсивно созданных задач

Я принял первоначальное решение для постера и попытался обратиться к состоянию гонки в afterExecute () но это не работает.

Пожалуйста, помогите пролить свет на это.Спасибо.

Ответы [ 2 ]

2 голосов
/ 30 марта 2012

Ваши задачи ждут этого newTaskCondition, но ничто никогда не сигнализирует об этом условии. Таким образом, все ваши потоки накапливаются, все ожидают на newTaskCondition, пока не истечет время ожидания. Кроме того, блокировка в afterExecute задержит выполнение задачи, так что это, вероятно, не то, что вы хотите сделать.

Если вы хотите немного подождать, чтобы узнать, сколько еще работы придет, эта функциональность уже существует. Просто позвоните setKeepAliveTime, чтобы указать, как долго ждать, и установите allowCoreThreadTimeOut, чтобы убедиться, что все потоки (включая основные потоки) могут быть завершены.

0 голосов
/ 03 апреля 2012

Я нашел очень простое решение моей проблемы.

Хотя количество задач заранее неизвестно, и цель состоит в том, чтобы заставить главный поток ждать завершения всех задач в threadPool, время, в которое все задачи были отправлены, известно (после поиска и отправки - метод задачи сканировал все файлы).

Я могу просто вызвать ThreadPoolExecutor.shutdown () и awaitTermination (Long.MAX_VALUE), так что основной поток будет бесконечно ждать, пока ThreadPool завершит свои задачи.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...