ThreadPoolExecutor's getActiveCount () - PullRequest
       17

ThreadPoolExecutor's getActiveCount ()

5 голосов
/ 01 сентября 2011

У меня есть ThreadPoolExecutor, который, кажется, лжет мне, когда я вызываю getActiveCount ().Однако я не занимался многопоточным программированием, поэтому, возможно, я делаю что-то неправильно.

Вот мой TPE

@Override
public void afterPropertiesSet() throws Exception {

    BlockingQueue<Runnable> workQueue;
    int maxQueueLength = threadPoolConfiguration.getMaximumQueueLength();
    if (maxQueueLength == 0) {
        workQueue = new LinkedBlockingQueue<Runnable>();
    } else {
        workQueue = new LinkedBlockingQueue<Runnable>(maxQueueLength);
    }

    pool = new ThreadPoolExecutor(
                   threadPoolConfiguration.getCorePoolSize(),
                   threadPoolConfiguration.getMaximumPoolSize(),
                   threadPoolConfiguration.getKeepAliveTime(),
                   TimeUnit.valueOf(threadPoolConfiguration.getTimeUnit()),
                   workQueue,
                   // Default thread factory creates normal-priority,
                   // non-daemon threads.
                   Executors.defaultThreadFactory(),
                   // Run any rejected task directly in the calling thread.
                   // In this way no records will be lost due to rejection
                   // however, no records will be added to the workQueue
                   // while the calling thread is processing a Task, so set
                   // your queue-size appropriately.
                   //
                   // This also means MaxThreadCount+1 tasks may run
                   // concurrently. If you REALLY want a max of MaxThreadCount
                   // threads don't use this.
                   new ThreadPoolExecutor.CallerRunsPolicy());
}

В этом классе у меня также есть DAO, который я передаюв мой Runnable (FooWorker), например, так:

@Override
public void addTask(FooRecord record) {
    if (pool == null) {
        throw new FooException(ERROR_THREAD_POOL_CONFIGURATION_NOT_SET);
    }
    pool.execute(new FooWorker(context, calculator, dao, record));
}

FooWorker запускает record (единственный non-singleton) через конечный автомат через calculator, затем отправляет переходы в базу данныхчерез dao, вот так:

public void run() {
    calculator.calculate(record);
    dao.save(record);
}

Как только мой главный поток завершил создание новых задач, я пытаюсь дождаться успешного завершения всех потоков:

while (pool.getActiveCount() > 0) {
    recordHandler.awaitTermination(terminationTimeout, 
                                   terminationTimeoutUnit);
}

What I 'я вижу из выходных журналов (которые предположительно ненадежны из-за потоков), что getActiveCount () возвращает ноль слишком рано, и цикл while () завершается, в то время как мои последние потоки все еще печатают вывод из calculator.

Примечание. Я также пытался дозвониться до pool.shutdown(), затем использовать awaitTermination, но затем при следующем запуске моей работы пул все еще будет закрыт.

Моя единственная догадка заключается в том, чтовнутри потока, когда я отправляю данные в dao (так как это синглтон, созданный Spring в основном потоке ...), java считает этот поток неактивным, поскольку ( я предполагаю ) он обрабатывает / ожидает в основном потоке.

Интуитивноосновываясь только на том, что я вижу, это мое предположение.Но ... Это действительно то, что происходит?Есть ли способ «сделать все правильно», не помещая вручную увеличенную переменную в верхнюю часть run() и уменьшенную в конце, чтобы отслеживать количество потоков?

Если ответ «не проходи через дао», то не придется ли мне «новый» DAO для каждого потока?Мой процесс уже (красивый, эффективный) зверь, но это действительно отстой.

Ответы [ 2 ]

8 голосов
/ 01 сентября 2011

Поскольку JavaDoc getActiveCount заявляет , это приблизительное значение: вы не должны основывать на этом какие-либо важные решения бизнес-логики.

Если вы хотите дождаться завершения всех запланированных задач, вам просто нужно использовать

pool.shutdown();
pool.awaitTermination(terminationTimeout, terminationTimeoutUnit);

Если вам нужно дождаться завершения задачи , определенной , вам следует использовать submit() вместо execute(), а затем проверить Future объект для завершения (либо используя isDone(), если вы хотите сделать это неблокирующим, либо просто вызывая get(), который блокирует, пока задача не будет выполнена).

6 голосов
/ 01 сентября 2011

Документация предполагает, что метод getActiveCount() для ThreadPoolExecutor не является точным числом:

getActiveCount

public int getActiveCount ()

Возвращает приблизительное количество потоков, которые активно выполняют задачи.

Возвращает: количество потоков

Лично, когда я выполняю многопоточную работу, такую ​​как эта,Я использую переменную, которую я увеличиваю при добавлении задач, и уменьшаю при получении их результатов.

...