Следующий класс обтекает ThreadPoolExecutor и использует семафор для блокировки, после чего рабочая очередь заполнена:
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
Этот класс-обертка основан на решении, приведенном Брайаном Гетцем в книге «Параллелизм Java на практике». Решение в книге принимает только два параметра конструктора: Executor
и границу, используемую для семафора. Это показано в ответе от Fixpoint. Существует проблема с этим подходом: он может войти в состояние, когда потоки пула заняты, очередь заполнена, но семафор только что выпустил разрешение. (semaphore.release()
в блоке finally). В этом состоянии новая задача может получить только что выпущенное разрешение, но она отклоняется, поскольку очередь задач заполнена. Конечно, это не то, что вы хотите; Вы хотите заблокировать в этом случае.
Чтобы решить эту проблему, мы должны использовать неограниченную очередь, как ясно упоминает JCiP. Семафор выступает в роли охранника, создавая эффект размера виртуальной очереди. Это имеет побочный эффект, заключающийся в том, что устройство может содержать maxPoolSize + virtualQueueSize + maxPoolSize
задач. Это почему? Из-за
semaphore.release()
в блоке finally. Если все потоки пула вызывают этот оператор в одно и то же время, то освобождаются разрешения maxPoolSize
, позволяющие войти в устройство одинаковому количеству задач. Если бы мы использовали ограниченную очередь, она все равно была бы заполнена, что приводило к отклонению задачи. Теперь, поскольку мы знаем, что это происходит только тогда, когда поток пула почти завершен, это не проблема. Мы знаем, что поток пула не будет блокироваться, поэтому вскоре задача будет взята из очереди.
Вы можете использовать ограниченную очередь, хотя. Просто убедитесь, что его размер равен virtualQueueSize + maxPoolSize
. Большие размеры бесполезны, семафор будет препятствовать пропуску большего количества элементов. Меньшие размеры приведут к отклонению задач. Вероятность отклонения заданий увеличивается с уменьшением размера. Например, скажем, вы хотите ограниченного исполнителя с maxPoolSize = 2 и virtualQueueSize = 5. Затем возьмите семафор с 5 + 2 = 7 разрешениями и фактическим размером очереди 5 + 2 = 7. Реальное количество задач, которые могут быть в блоке, составляет 2 + 5 + 2 = 9. Когда исполнитель заполнен (5 задач в очереди, 2 в пуле потоков, поэтому доступно 0 разрешений) и ВСЕ потоки пула освобождают свои разрешения, тогда входящие задачи могут получить ровно 2 разрешения.
Теперь решение от JCiP несколько громоздко в использовании, поскольку оно не обеспечивает выполнение всех этих ограничений (неограниченная очередь, или ограничение с этими математическими ограничениями и т. Д.). Я думаю, что это служит хорошим примером для демонстрации того, как вы можете создавать новые классы, ориентированные на многопотоковое исполнение, на основе уже доступных частей, но не как полноценный, повторно используемый класс. Я не думаю, что последний был намерением автора.