Примечание: эти реализации несколько ошибочны и недетерминированы.Пожалуйста, прочитайте весь ответ и комментарии, прежде чем использовать этот код.
Как насчет создания рабочей очереди, которая отклоняет элементы, когда исполнитель меньше максимального размера пула, и начинает принимать их, когда максимумдостигнут?
Это зависит от задокументированного поведения:
"Если запрос не может быть поставлен в очередь, создается новый поток, если он не превысит MaximumPoolSize, и в этом случае задача будетбыть отклоненным. "
public class ExecutorTest
{
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int KEEP_ALIVE_TIME_MS = 5000;
public static void main(String[] args)
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue();
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 6; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread());
}
});
}
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private ThreadPoolExecutor executor;
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (executor.getPoolSize() < executor.getMaximumPoolSize())
{
return false;
}
return super.offer(e);
}
}
}
Примечание. Ваш вопрос удивил меня, поскольку я ожидал, что вашим желаемым поведением будет поведение по умолчанию для ThreadPoolExecutor, настроенного с corePoolSize
Идея # 2
Я думаю, что у меня есть несколько лучший подход.Он основан на поведении побочных эффектов, закодированном в методе setCorePoolSize
в ThreadPoolExecutor
.Идея состоит в том, чтобы временно и условно увеличить размер основного пула, когда рабочий элемент ставится в очередь.При увеличении размера основного пула ThreadPoolExecutor
немедленно создаст достаточно новых потоков для выполнения всех задач в очереди (queue.size ()).Затем мы немедленно уменьшаем размер основного пула, что позволяет естественному сокращению пула потоков в последующие периоды низкой активности.Этот подход все еще не является полностью детерминированным (например, размер пула может превышать максимальный размер пула), но я думаю, что почти во всех случаях он лучше, чем первая стратегия.
В частностиЯ считаю, что этот подход лучше первого, потому что:
- Он будет чаще использовать потоки
- Он не будет отклонять выполнение в результате гонки
- Я хотел бы еще раз упомянуть, что при первом подходе пул потоков увеличивается до максимального размера даже при очень небольшом использовании.Этот подход должен быть намного более эффективным в этом отношении.
-
public class ExecutorTest2
{
private static final int KEEP_ALIVE_TIME_MS = 5000;
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
public static void main(String[] args) throws InterruptedException
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE);
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 60; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread()
+ " poolSize: " + executor.getPoolSize());
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private final int corePoolSize;
private final int maximumPoolSize;
private ThreadPoolExecutor executor;
public SaturateExecutorBlockingQueue(int corePoolSize,
int maximumPoolSize)
{
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
}
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (super.offer(e) == false)
{
return false;
}
// Uncomment one or both of the below lines to increase
// the likelyhood of the threadpool reusing an existing thread
// vs. spawning a new one.
//Thread.yield();
//Thread.sleep(0);
int currentPoolSize = executor.getPoolSize();
if (currentPoolSize < maximumPoolSize
&& currentPoolSize >= corePoolSize)
{
executor.setCorePoolSize(currentPoolSize + 1);
executor.setCorePoolSize(corePoolSize);
}
return true;
}
}
}