Вероятно, нет необходимости микроуправлять пулом потоков в соответствии с запросом.
Кэшированный пул потоков будет повторно использовать незанятые потоки, а также разрешать потенциально неограниченное количество одновременных потоков.Это, конечно, может привести к снижению производительности разгона из-за издержек переключения контекста в периоды пакетной обработки.
Executors.newCachedThreadPool();
Лучшим вариантом является ограничение общего количества потоков при одновременном отказе от идеи обеспечения использования свободных потоковпервый.Изменения конфигурации будут следующими:
corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);
По этому сценарию, если у исполнителя меньше чем corePoolSize
потоков, он не должен быть слишком занят.Если система не очень занята, то создание нового потока приносит мало вреда.Это приведет к тому, что ваш ThreadPoolExecutor
всегда будет создавать нового работника, если он не превышает максимально допустимое количество работников.Только когда максимальное количество рабочих «работает», рабочие будут бездействовать, ожидая выполнения задач.Если работник ждет aReasonableTimeDuration
без задачи, то он может прекратить работу.Используя разумные ограничения для размера пула (в конце концов, таких процессоров очень много) и достаточно большой тайм-аут (чтобы не допустить ненужного завершения потоков), вероятно, будут достигнуты желаемые преимущества.
Последний вариант -хак.По сути, ThreadPoolExecutor
внутренне использует BlockingQueue.offer
, чтобы определить, имеет ли очередь емкость.Пользовательская реализация BlockingQueue
всегда может отклонить попытку offer
.Когда ThreadPoolExecutor
не удается offer
выполнить задачу в очереди, он попытается сделать нового работника.Если новый работник не может быть создан, будет вызван RejectedExecutionHandler
.В этот момент пользовательский RejectedExecutionHandler
может заставить put
ввести пользовательский BlockingQueue
.
/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
BlockingQueue<T> delegate;
public boolean offer(T item) {
return false;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
delegate.put(r);
}
//.... delegate methods
}
.