Политика ThreadPoolExecutor - PullRequest
       6

Политика ThreadPoolExecutor

13 голосов
/ 06 августа 2010

Я пытаюсь использовать ThreadPoolExecutor для планирования задач, но сталкиваюсь с некоторыми проблемами с его политиками.Вот его заявленное поведение:

  1. Если работает меньше потоков corePoolSize, Исполнитель всегда предпочитает добавлять новый поток, а не ставить в очередь.
  2. Если работает corePoolSize или несколько потоков, Исполнительвсегда предпочитает ставить запрос в очередь, а не добавлять новый поток.
  3. Если запрос не может быть поставлен в очередь, создается новый поток, если только он не превысит MaximumPoolSize, и в этом случае задача будет отклонена.

Мне нужно следующее поведение:

  1. то же, что и выше
  2. Если выполняется больше, чем corePoolSize, но меньше, чем MaximumPoolSize, предпочитает добавлять новый поток вместо очередейи использование свободного потока поверх добавления нового потока.
  3. то же, что и выше

По сути, я не хочу отклонять какие-либо задачи;Я хочу, чтобы они стояли в очереди в неограниченной очереди.Но я хочу иметь до максимума потоки PoolSize.Если я использую неограниченную очередь, она никогда не генерирует потоки после попадания в coreSize.Если я использую ограниченную очередь, она отклоняет задачи.Есть ли способ обойти это?

То, о чем я сейчас думаю, - это запустить ThreadPoolExecutor в SynchronousQueue, но не передавать задачи непосредственно ему, вместо этого направляя их в отдельный неограниченный LinkedBlockingQueue.Затем другой поток передает из LinkedBlockingQueue в Executor, и если один из них отклоняется, он просто пытается снова, пока он не будет отклонен.Это похоже на боль и немного хакерство - есть ли более чистый способ сделать это?

Ответы [ 4 ]

4 голосов
/ 06 августа 2010

Вероятно, нет необходимости микроуправлять пулом потоков в соответствии с запросом.

Кэшированный пул потоков будет повторно использовать незанятые потоки, а также разрешать потенциально неограниченное количество одновременных потоков.Это, конечно, может привести к снижению производительности разгона из-за издержек переключения контекста в периоды пакетной обработки.

Executors.newCachedThreadPool();

Лучшим вариантом является ограничение общего количества потоков при одновременном отказе от идеи обеспечения использования свободных потоковпервый.Изменения конфигурации будут следующими:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

По этому сценарию, если у исполнителя меньше чем corePoolSize потоков, он не должен быть слишком занят.Если система не очень занята, то создание нового потока приносит мало вреда.Это приведет к тому, что ваш ThreadPoolExecutor всегда будет создавать нового работника, если он не превышает максимально допустимое количество работников.Только когда максимальное количество рабочих «работает», рабочие будут бездействовать, ожидая выполнения задач.Если работник ждет aReasonableTimeDuration без задачи, то он может прекратить работу.Используя разумные ограничения для размера пула (в конце концов, таких процессоров очень много) и достаточно большой тайм-аут (чтобы не допустить ненужного завершения потоков), вероятно, будут достигнуты желаемые преимущества.

Последний вариант -хак.По сути, ThreadPoolExecutor внутренне использует BlockingQueue.offer, чтобы определить, имеет ли очередь емкость.Пользовательская реализация BlockingQueue всегда может отклонить попытку offer.Когда ThreadPoolExecutor не удается offer выполнить задачу в очереди, он попытается сделать нового работника.Если новый работник не может быть создан, будет вызван RejectedExecutionHandler.В этот момент пользовательский RejectedExecutionHandler может заставить put ввести пользовательский BlockingQueue.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}
.
1 голос
/ 06 августа 2010

Ваш вариант использования является распространенным, вполне законным и, к сожалению, более сложным, чем можно было ожидать.Для справочной информации вы можете прочитать это обсуждение и найти указатель на решение (также упомянутое в теме) здесь .Решение Шэя работает отлично.

В общем, я бы немного опасался неограниченных очередей;обычно лучше иметь явное управление входящим потоком, которое изящно ухудшается и регулирует соотношение текущей / оставшейся работы, чтобы не перегружать ни производителя, ни потребителя.

1 голос
/ 06 августа 2010

Вы бы искали что-то более похожее на пул кэшированных потоков?

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()

1 голос
/ 06 августа 2010

Просто установите corePoolsize = maximumPoolSize и используйте неограниченную очередь?

В вашем списке точек 1 исключает 2, поскольку corePoolSize всегда будет меньше или равно maximumPoolSize.

Редактировать

Между тем, что вы хотите, и тем, что вам предложит TPE, есть что-то несовместимое.

Если у вас неограниченная очередь, maximumPoolSize игнорируется, поэтомукак вы заметили, не будет создано и использовано не более corePoolSize потоков.

Итак, опять же, если вы берете corePoolsize = maximumPoolSize с неограниченной очередью, у вас есть то, что вы хотите, нет?

...