ExecutorService, стандартный способ избежать переполнения очереди задач - PullRequest
28 голосов
/ 12 февраля 2010

Я использую ExecutorService для удобства параллельной многопоточной программы. Возьмите следующий код:

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
    ...  
    Future<..> ... = exService.submit(..);
    ...
}

В моем случае проблема в том, что submit() не блокируется, если все NUMBER_THREADS заняты. В результате очередь задач заполняется многими задачами. Следствием этого является то, что отключение службы выполнения с помощью ExecutorService.shutdown() занимает много времени (ExecutorService.isTerminated() будет ложным в течение длительного времени). Причина в том, что очередь задач все еще достаточно заполнена.

Пока мой обходной путь - это работать с семафорами, чтобы запретить иметь много записей в очереди задач ExecutorService:

...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
    ...
    semaphore.aquire();  
    // internally the task calls a finish callback, which invokes semaphore.release()
    // -> now another task is added to queue
    Future<..> ... = exService.submit(..); 
    ...
}

Я уверен, что есть лучшее, более инкапсулированное решение?

Ответы [ 5 ]

27 голосов
/ 21 июня 2012

Хитрость заключается в использовании фиксированного размера очереди и:

new ThreadPoolExecutor.CallerRunsPolicy()

Я также рекомендую использовать Guava's ListeningExecutorService . Вот пример очередей потребителей / производителей.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  5000L, TimeUnit.MILLISECONDS,
                                  new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}

Что-нибудь лучше, и вы можете рассмотреть MQ, такой как RabbitMQ или ActiveMQ, поскольку они имеют технологию QoS.

6 голосов
/ 12 февраля 2010

Вы можете позвонить ThreadPoolExecutor.getQueue().size(), чтобы узнать размер очереди ожидания. Вы можете принять меры, если очередь слишком длинная. Я предлагаю запустить задачу в текущем потоке, если очередь слишком длинная, чтобы замедлить производителя (если это уместно).

5 голосов
/ 04 июня 2010

Истинная блокировка ThreadPoolExecutor была в списке пожеланий многих, на ней даже открыта ошибка JDC. Я столкнулся с той же проблемой, и столкнулся с этим: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

Это реализация BlockingThreadPoolExecutor, реализованная с использованием RejectionPolicy, которая использует предложение для добавления задачи в очередь, ожидая, пока в очереди будет место. Выглядит хорошо.

5 голосов
/ 12 февраля 2010

Вам лучше создать ThreadPoolExecutor самостоятельно (что в любом случае делает Executors.newXXX ()).

В конструкторе вы можете передать BlockingQueue, чтобы Исполнитель использовал его в качестве очереди задач. Если вы передаете ограниченный по размеру BlockingQueue (например, LinkedBlockingQueue ), он должен достичь желаемого эффекта.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
1 голос
/ 12 июня 2011

вы можете добавить еще одну очередь bloquing, которая имеет ограниченный размер для контроля размера внутренней очереди в executorService, некоторые считают, что семафор, но очень легко перед исполнителем ставишь () и когда задача достигает take (). take () должен быть внутри кода задачи

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...