ThreadPoolExecutor с corePoolSize 0 не должен выполнять задачи, пока очередь задач не заполнится - PullRequest
0 голосов
/ 10 сентября 2018

Я проходил Java-параллелизм на практике и застрял в теме 8.3.1 Создание потока и разрыв . Следующая сноска предупреждает о сохранении corePoolSize на нуле.

Разработчики иногда испытывают желание установить нулевой размер ядра, чтобы рабочие потоки в конечном итоге будет снесен и, следовательно, не будет препятствовать выходу JVM, но это может вызвать некоторые странное поведение в пулах потоков, которые не используют SynchronousQueue для своей рабочей очереди (как это делает newCachedThreadPool). Если пул уже имеет размер ядра, ThreadPoolExecutor создает новый поток, только если рабочая очередь заполнена. Таким образом, задачи передаются в пул потоков с рабочей очередью которая имеет какую-либо емкость и нулевой размер ядра не будет выполняться, пока очередь не заполнится , что обычно не то, что нужно.

Поэтому, чтобы проверить это, я написал эту программу, которая не работает, как указано выше.

    final int corePoolSize = 0;
    ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());

    // If the pool is already at the core size
    if (tp.getPoolSize() == corePoolSize) {
        ExecutorService ex = tp;

        // So tasks submitted to a thread pool with a work queue that has any capacity
        // and a core size of zero will not execute until the queue fills up.
        // So, this should not execute until queue fills up.
        ex.execute(() -> System.out.println("Hello"));
    }

выход : Hello

Итак, предполагает ли поведение программы, что ThreadPoolExecutor создает хотя бы один поток, если задача отправлена ​​независимо от corePoolSize=0. Если да, то о чем предупреждение в учебнике.

РЕДАКТИРОВАТЬ: Протестировал код в jdk1.5.0_22 по предложению @ S.K. со следующим изменением:

ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1));//Queue size is set to 1.

Но с этим изменением программа завершает работу без вывода какого-либо вывода.

Значит, я неправильно истолковываю эти утверждения из книги?

РЕДАКТИРОВАТЬ (@sjlee): Трудно добавить код в комментарий, поэтому я добавлю его в качестве правки здесь ... Можете ли вы попробовать эту модификацию и запустить ее как для последних JDK и JDK 1,5?

final int corePoolSize = 0;
ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

// If the pool is already at the core size
if (tp.getPoolSize() == corePoolSize) {
    ExecutorService ex = tp;

    // So tasks submitted to a thread pool with a work queue that has any capacity
    // and a core size of zero will not execute until the queue fills up.
    // So, this should not execute until queue fills up.
    ex.execute(() -> System.out.println("Hello"));
}
tp.shutdown();
if (tp.awaitTermination(1, TimeUnit.SECONDS)) {
    System.out.println("thread pool shut down. exiting.");
} else {
    System.out.println("shutdown timed out. exiting.");
}

@ sjlee Опубликовал результат в комментариях.

Ответы [ 2 ]

0 голосов
/ 15 сентября 2018

Запустив эту программу в jdk 1.5,1.6,1.7 и 1.8, я обнаружил различные реализации ThreadPoolExecutor#execute(Runnable) в 1.5,1.6 и 1.7+. Вот что я нашел:

Реализация JDK 1.5

 //Here poolSize is the number of core threads running.

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    for (;;) {
        if (runState != RUNNING) {
            reject(command);
            return;
        }
        if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
            return;
        if (workQueue.offer(command))
            return;
        Runnable r = addIfUnderMaximumPoolSize(command);
        if (r == command)
            return;
        if (r == null) {
            reject(command);
            return;
        }
        // else retry
    }
}

Эта реализация не создает поток, когда corePoolSize равен 0, поэтому поставляемая задача не выполняется.

Реализация JDK 1.6

//Here poolSize is the number of core threads running.

  public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

JDK 1.6 создает новый поток, даже если corePoolSize равен 0.

Реализация JDK 1.7+ (аналогично JDK 1.6, но с улучшенными блокировками и проверками состояния)

    public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

JDK 1.7 также создает новый поток, даже если corePoolSize равен 0.

Похоже, что corePoolSize=0 - это особый случай в каждой версии JDK 1.5 и JDK 1.6+.

Но странно, что объяснение книги не соответствует ни одному из результатов программы.

0 голосов
/ 10 сентября 2018

Похоже, что это была ошибка в старых версиях Java, но в Java 1.8 ее нет.

В соответствии с документацией Java 1.8 от ThreadPoolExecutor.execute():

     /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * ....
     */

Во втором пункте после добавления работника в очередь происходит повторная проверка, что, если вместо постановки задачи в очередь можно запустить новый поток, откат постановки в очередь и запуск нового потока.

Это то, что происходит. Во время первой проверки задача ставится в очередь, но во время повторной проверки запускается новый поток, который выполняет вашу задачу.

...