Почему ThreadPoolExecutor уменьшает количество потоков ниже corePoolSize после keepAliveTime? - PullRequest
9 голосов
/ 28 марта 2012

Я возился с различными стратегиями для объединения потоков, используя ThreadPoolExecutor с JDK6.У меня работает очередь Priority, но я не был уверен, понравилось ли мне, что размер пула не изменился после keepAliveTime (что вы получаете с неограниченной очередью).Итак, я смотрю на ThreadPoolExecutor, использующего LinkedBlockingQueue и политику CallerRuns.

Проблема, с которой я сейчас сталкиваюсь, заключается в том, что пул наращивается, так как документы объясняют, что он должен, но послезадачи завершены, и keepAliveTime вступает в игру. getPoolSize показывает, что пул уменьшается до нуля.Приведенный ниже пример кода должен помочь вам увидеть основу для моего вопроса:

public class ThreadPoolingDemo {
    private final static Logger LOGGER =
         Logger.getLogger(ThreadPoolingDemo.class.getName());

    public static void main(String[] args) throws Exception {
        LOGGER.info("MAIN THREAD:starting");
        runCallerTestPlain();   
    }

    private static void runCallerTestPlain() throws InterruptedException {
        //10 core threads, 
        //50 max pool size, 
        //100 tasks in queue, 
        //at max pool and full queue - caller runs task
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50,
            5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100),
            new ThreadPoolExecutor.CallerRunsPolicy());

        //dump 5000 tasks on the queue
        for (int i = 0; i < 5000; i++) {
            tpe.submit(new Runnable() {
                @Override
                public void run() {
                    //just to eat some time and give a little feedback
                    for (int j = 0; j < 20; j++) {
                        LOGGER.info("First-batch Task, looping:" + j + "["
                               + Thread.currentThread().getId() + "]");
                    }
                }
            }, null);
        }
        LOGGER.info("MAIN THREAD:!!Done queueing!!");

        //check tpe statistics forever
        while (true) {
            LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: "
                 + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize());
            Thread.sleep(1000);
        }
    }
}

Я обнаружил старую ошибку, которая, по-видимому, является этой проблемой, но она была закрыта: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. Может ли она все еще присутствоватьв 1.6 или я что-то упустил?

Похоже, что я резиновая утка это (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). Ошибка, которую я связал выше, связана с этим: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792,, где проблема кажетсядолжен быть решен в 1.7 (я загрузил 1.7 и подтвердил - исправил ...). Я думаю, моя главная проблема заключалась в том, что эта фундаментальная ошибка оставалась почти десятилетие. Я потратил слишком много времени на написание этого, чтобы не публиковать его сейчас,надеюсь, это кому-нибудь поможет.

1 Ответ

6 голосов
/ 28 марта 2012

... после того, как задачи завершены и keepAliveTime вступает в игру, getPoolSize показывает, что пул уменьшается до нуля.

Так что это выглядит как условие гонки в ThreadPoolExecutor,Я думаю, что это работает в соответствии с дизайном, хотя и не ожидал.В методе getTask(), по которому рабочие потоки перебирают циклы для получения задач из очереди блокировки, вы видите этот код:

if (state == SHUTDOWN)  // Help drain queue
    r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
    r = workQueue.take();
if (r != null)
    return r;
if (workerCanExit()) {
    if (runState >= SHUTDOWN) // Wake up others
        interruptIdleWorkers();
    return null;
}

Если poolSize превышает значение corePoolSize, тогда, если время опросапосле keepAliveTime код падает до workerCanExit(), поскольку r равно null.Все потоки могут возвращать true из этого метода, поскольку он просто проверяет состояние poolSize:

    mainLock.lock();
    boolean canExit;
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize)); << test poolSize here
    } finally {
        mainLock.unlock();                         << race to workerDone() begins
    }

Как только это вернет true, рабочий поток завершится и затем poolSize уменьшается.Если все рабочие потоки проходят этот тест одновременно, то все они завершаются из-за гонки между тестированием poolSize и остановкой рабочего, когда происходит --poolSize.

Что меня удивляетнасколько последовательны эти условия гонки.Если вы добавите некоторую рандомизацию к sleep() внутри run() ниже, то вы можете заставить некоторые основные потоки не выходить из игры, но я бы подумал, что из-за условия гонки было бы труднее попасть.


Вы можете увидеть это поведение в следующем тесте:

@Test
public void test() throws Exception {
    int before = Thread.activeCount();
    int core = 10;
    int max = 50;
    int queueSize = 100;
    ThreadPoolExecutor tpe =
            new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(queueSize),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    tpe.allowCoreThreadTimeOut(false);
    assertEquals(0, tpe.getActiveCount());
    // if we start 1 more than can go into core or queue, poolSize goes to 0
    int startN = core + queueSize + 1;
    // if we only start jobs the core can take care of, then it won't go to 0
    // int startN = core + queueSize;
    for (int i = 0; i < startN; i++) {
        tpe.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    while (true) {
        System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize()
                + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before));
        Thread.sleep(1000);
    }
}

Если вы измените строку sleep внутри метода run() на что-то вроде этого:

private final Random random = new Random();
...
    Thread.sleep(100 + random.nextInt(100));

Thisзатруднит попадание в состояние гонки, поэтому некоторые основные нити все еще будут вокруг.

...