Невозможно сделать пул кэшированных потоков с ограничением размера? - PullRequest
117 голосов
/ 26 ноября 2009

Кажется невозможным создать пул кэшированных потоков с ограничением количества потоков, которые он может создать.

Вот как статический Executors.newCachedThreadPool реализован в стандартной библиотеке Java:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Итак, используя этот шаблон для создания пула потоков кэширования фиксированного размера:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Теперь, если вы используете это и отправляете 3 задания, все будет хорошо. Отправка любых дальнейших задач приведет к отклоненным исключениям исполнения.

Попробуем это:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

В результате все потоки будут выполняться последовательно. То есть пул потоков никогда не будет создавать более одного потока для выполнения ваших задач.

Это ошибка в методе выполнения ThreadPoolExecutor? Или, может быть, это намеренно? Или есть другой способ?

Edit: я хочу что-то в точности как пул кэшированных потоков (он создает потоки по требованию, а затем убивает их по истечении некоторого времени ожидания), но с ограничением на количество потоков, которые он может создать, и возможностью продолжать ставить в очередь дополнительные задачи как только он достиг своего предела потока. Согласно ответу Сджли это невозможно. Глядя на метод execute () ThreadPoolExecutor, это действительно невозможно. Мне нужно было бы создать подкласс ThreadPoolExecutor и переопределить execute () в некоторой степени, как это делает SwingWorker, но то, что SwingWorker делает в своем execute (), является полным взломом.

Ответы [ 11 ]

220 голосов
/ 26 ноября 2009

ThreadPoolExecutor имеет следующие несколько ключевых вариантов поведения, и эти проблемы могут быть объяснены вашими проблемами.

Когда задание отправлено,

  1. Если пул потоков не достиг размера ядра, создаются новые потоки.
  2. Если достигнут размер ядра и нет свободных потоков, он ставит задачи в очередь.
  3. Если достигнут размер ядра, нет свободных потоков, и очередь заполняется, создаются новые потоки (до достижения максимального размера).
  4. Если достигнут максимальный размер, нет свободных потоков, и очередь заполняется, включается политика отклонения.

В первом примере обратите внимание, что SynchronousQueue по существу имеет размер 0. Поэтому, как только вы достигнете максимального размера (3), политика отклонения вступит в действие (# 4).

Во втором примере предпочтительной очередью является LinkedBlockingQueue, размер которой неограничен. Таким образом, вы застряли с поведением № 2.

Вы не можете сильно повозиться с кэшированным типом или фиксированным типом, поскольку их поведение почти полностью определено.

Если вы хотите иметь ограниченный и динамический пул потоков, вам нужно использовать положительный размер ядра и максимальный размер в сочетании с очередью конечного размера. Например,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Добавление : это довольно старый ответ, и похоже, что JDK изменил свое поведение, когда дело доходит до размера ядра 0. Начиная с JDK 1.6, если размер ядра равен 0, а пул не Если у вас есть потоки, ThreadPoolExecutor добавит поток для выполнения этой задачи. Таким образом, размер ядра 0 является исключением из правила выше. Спасибо Стиву за за привлечение этого внимания.

58 голосов
/ 14 ноября 2011

Если я ничего не пропустил, решение исходного вопроса простое. Следующий код реализует желаемое поведение, описанное оригинальным постером. Он создаст до 5 потоков для работы в неограниченной очереди, и свободные потоки прекратят работу через 60 секунд.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);
7 голосов
/ 10 августа 2013

Была такая же проблема. Поскольку ни один другой ответ не объединяет все вопросы, я добавляю свои:

Теперь оно четко записано в документах : если вы используете очередь, которая не блокирует (LinkedBlockingQueue), настройка максимального количества потоков не действует, используются только основные потоки.

так:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

У этого исполнителя:

  1. Нет понятия о максимальных потоках, так как мы используем неограниченную очередь. Это хорошо, потому что такая очередь может привести к тому, что исполнитель создаст огромное количество неосновных, дополнительных потоков, если он будет следовать своей обычной политике.

  2. очередь максимального размера Integer.MAX_VALUE. Submit() выдаст RejectedExecutionException, если количество ожидающих заданий превысит Integer.MAX_VALUE. Не уверен, что сначала у нас закончится память, иначе это произойдет.

  3. Имеет 4 основных резьбы. Свободные основные потоки автоматически завершаются, если они простаивают в течение 5 секунд. Так что да, потоки строго по требованию. Количество можно изменять с помощью метода setThreads().

  4. Гарантирует, что минимальное количество основных потоков никогда не будет меньше одного, иначе submit() отклонит каждую задачу. Поскольку потоки ядра должны быть> = max потоков, метод setThreads() также устанавливает max потоков, хотя настройка max thread бесполезна для неограниченной очереди.

6 голосов
/ 26 ноября 2009

В первом примере последующие задачи отклоняются, поскольку AbortPolicy является значением по умолчанию RejectedExecutionHandler. ThreadPoolExecutor содержит следующие политики, которые можно изменить с помощью метода setRejectedExecutionHandler:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Звучит так, будто вы хотите использовать кешированный пул потоков с CallerRunsPolicy.

5 голосов
/ 05 декабря 2010

Ни один из ответов здесь не устранил мою проблему, связанную с созданием ограниченного количества HTTP-соединений с использованием HTTP-клиента Apache (версия 3.x). Поскольку мне потребовалось несколько часов, чтобы выяснить правильную настройку, я поделюсь:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Это создает ThreadPoolExecutor, который начинается с пяти и содержит максимум десять одновременно работающих потоков, используя для выполнения CallerRunsPolicy.

3 голосов
/ 26 ноября 2009

Согласно Javadoc для ThreadPoolExecutor:

Если запущено больше чем corePoolSize, но меньше чем MaximumPoolSize потоков, новый поток будет создан , только если очередь заполнена . Установив одинаковые значения corePoolSize и MaximumPoolSize, вы создаете пул потоков фиксированного размера.

(Акцент мой.)

Ответ джиттера - это то, что вы хотите, хотя мой ответ на другой ваш вопрос. :)

2 голосов
/ 15 октября 2011

Не похоже, что какой-либо из ответов на самом деле отвечает на вопрос - на самом деле я не вижу способа сделать это - даже если вы создаете подкласс из PooledExecutorService, поскольку многие из методов / свойств являются частными, например, делая addIfUnderMaximumPoolSize защищенным, вы можете сделать следующее:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Самое близкое, что я получил, было это - но даже это не очень хорошее решение

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

p.s. не проверено выше

2 голосов
/ 29 августа 2011

есть еще один вариант. Вместо использования новой SynchronousQueue вы также можете использовать любую другую очередь, но вы должны убедиться, что ее размер равен 1, что заставит executorservice создать новый поток.

2 голосов
/ 26 ноября 2009

Это то, что вы хотите (по крайней мере, я так думаю). Для объяснения проверьте Джонатан Фейнберг ответ

Executors.newFixedThreadPool(int n)

Создает пул потоков, который повторно использует фиксированное количество потоков, работающих в общей неограниченной очереди. В любой момент не более nThreads потоков будут активными задачами обработки. Если дополнительные задачи отправляются, когда все потоки активны, они будут ждать в очереди, пока поток не станет доступным. Если какой-либо поток завершается из-за сбоя во время выполнения до завершения работы, новый будет занимать его место, если это необходимо для выполнения последующих задач. Потоки в пуле будут существовать до тех пор, пока он не будет явно отключен.

1 голос
/ 15 октября 2011

Вот еще одно решение. Я думаю, что это решение ведет себя так, как вы хотите (хотя и не горжусь этим решением):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
...