Пул потоков Java с ограниченной очередью - PullRequest
29 голосов
/ 10 июня 2011

Я использую java.util.concurrent Executors класс для создания фиксированного пула потоков для запуска обработчиков запросов для веб-сервера:

static ExecutorService  newFixedThreadPool(int nThreads) 

и описание:

Создает пул потоков, который повторно использует фиксированный набор потоков, работающих из общей неограниченной очереди.

Однако я ищу реализацию пула потоков, которая будет делать то же самое, за исключением ограниченной очереди. Есть ли такая реализация? Или мне нужно реализовать собственную оболочку для фиксированного пула потоков?

Ответы [ 4 ]

38 голосов
/ 10 июня 2011

То, что вы хотите сделать, это новый собственный ExecutorService, возможно, с использованием ThreadPoolExecutor . ThreadPoolExecutor имеет конструктор, который принимает BlockingQueue и для получения ограниченной очереди, которую вы используете, например ArrayBlockingQueue , правильно созданной для ограничения. Вы также можете включить RejectedExecutionHandler , чтобы определить, что делать, когда ваша очередь заполнена, или оставить ссылку на очередь блокировки и использовать методы предложения.

Вот небольшой пример:

BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(
    100);
ExecutorService executorService = new ThreadPoolExecutor(1, 10, 30,
    TimeUnit.SECONDS, linkedBlockingDeque,
    new ThreadPoolExecutor.CallerRunsPolicy());
6 голосов
/ 10 июня 2011

Создайте ThreadPoolexecutor и передайте в него подходящую реализацию BlockingQueue. например, вы можете передать ArrayBlockingQueue в конструкторе ThreadPoolExecutor, чтобы получить желаемый эффект.

3 голосов
/ 31 июля 2017

Я решил это с помощью семафора , который я использую для регулирования задач, отправляемых на ExecutorService.

Например:

int threadCount = 10;
ExecutorService consumerPool = Executors.newFixedThreadPool(threadCount);

// set the permit count greater than thread count so that we 
// build up a limited buffer of waiting consumers
Semaphore semaphore = new Semaphore(threadCount * 100); 

for (int i = 0; i < 1000000; ++i) {
    semaphore.acquire(); // this might block waiting for a permit 
    Runnable consumer = () -> {
       try {
          doSomeWork(i);
       } finally {
          semaphore.release(); // release a permit 
       }
    };
    consumerPool.submit(consumer);
}
3 голосов
/ 10 июня 2011

Когда вы создаете ThreadPoolExecutor, вы можете назначить ему ограниченный BlockingQueue и RejectedExecutionHandler, чтобы вы могли контролировать, что происходит при достижении предела.Поведение по умолчанию - генерировать исключение RejectedExecutionException.

Вы также можете определить собственную фабрику потоков, чтобы управлять именами потоков и делать их потоками демонов.

...