Желательно ли добавлять задачи в BlockingQueue ThreadPoolExecutor? - PullRequest
18 голосов
/ 07 апреля 2011

JavaDoc для ThreadPoolExecutor неясно, допустимо ли добавлять задачи непосредственно к BlockingQueue, поддерживающему исполнителя. В документах говорится, что вызов executor.getQueue() "предназначен в основном для отладки и мониторинга".

Я создаю ThreadPoolExecutor со своим собственным BlockingQueue.Я сохраняю ссылку на очередь, чтобы я мог добавлять задачи к ней напрямую.Эта же очередь возвращается getQueue(), поэтому я предполагаю, что указание в getQueue() относится к ссылке на резервную очередь, полученную с помощью моих средств.

Пример

Общая схема кода:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer() против executor.execute()

Насколько я понимаю, типичным использованием является добавление задач через executor.execute().Подход в моем примере выше имеет преимущество блокирования очереди, тогда как execute() завершается неудачно, если очередь заполнена и отклоняет мою задачу.Мне также нравится, что отправка заданий взаимодействует с очередью блокировки;мне кажется, это более «чистый» производитель-потребитель.

Подразумевается непосредственное добавление задач в очередь: я должен вызвать prestartAllCoreThreads(), иначе рабочие потоки не выполняются.При условии отсутствия других взаимодействий с исполнителем, ничто не будет контролировать очередь (проверка источника ThreadPoolExecutor подтверждает это).Это также подразумевает для прямого постановки в очередь, что ThreadPoolExecutor должен быть дополнительно сконфигурирован для> 0 основных потоков и не должен быть настроен для тайм-аута основных потоков.

tl; dr

УчитываяThreadPoolExecutor настроен следующим образом:

  • потоки ядра> 0
  • потокам ядра не разрешено время ожидания
  • потоки ядра предварительно запущены
  • хранить ссылку на BlockingQueue, поддерживающую исполнителя

Допустимо ли добавлять задачи непосредственно в очередь вместо вызова executor.execute()?

Related

Этот вопрос ( рабочие очереди производителя / потребителя ) аналогичен, но конкретно не касается добавления в очередь напрямую.

Ответы [ 5 ]

11 голосов
/ 07 апреля 2011

Один трюк состоит в том, чтобы реализовать собственный подкласс ArrayBlockingQueue и переопределить метод offer () для вызова вашей блокирующей версии, тогда вы все равно можете использовать обычный путь к коду.

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(как вы можетенаверное, думаю, я думаю, что вызывать предложение прямо в очереди, так как ваш обычный путь к коду, вероятно, плохая идея).

9 голосов
/ 07 апреля 2011

Если бы это был я, я бы предпочел использовать Executor#execute() вместо Queue#offer(), просто потому, что я уже использую все остальное, начиная с java.util.concurrent.

Ваш вопрос хороший, и он задетменя заинтересовало, поэтому я взглянул на источник для ThreadPoolExecutor#execute():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

Мы можем видеть, что выполняются сами вызовы offer() в очереди работы, но не раньше, чем делаются некоторые приятные, вкусные манипуляции с пуломесли необходимо.По этой причине я думаю, что было бы целесообразно использовать execute();отказ от его использования может (хотя я не знаю наверняка) привести к тому, что пул будет работать неоптимальным образом.Тем не менее, я не думаю, что использование offer() будет прерывать исполнителя - похоже, что задачи удаляются из очереди с помощью следующего (также из ThreadPoolExecutor):

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

Этот getTask() метод вызывается только из цикла, поэтому, если исполнитель не завершает работу, он блокируется до тех пор, пока в очередь не будет передано новое задание (независимо от того, откуда оно пришло).

Примечание : Несмотря на то, что я разместил здесь фрагменты кода из источника, мы не можем полагаться на них для окончательного ответа - мы должны только кодировать API.Мы не знаем, как реализация execute() будет меняться со временем.

4 голосов
/ 30 августа 2013

Можно фактически настроить поведение пула, когда очередь заполнена, указав RejectedExecutionHandler при создании экземпляра. ThreadPoolExecutor определяет четыре политики как внутренние классы, включая AbortPolicy, DiscardOldestPolicy, DiscardPolicy, а также мой личный фаворит CallerRunsPolicy, который запускает новое задание в управляющем потоке.

Например:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

Желаемое поведение в вопросе можно получить, используя что-то вроде:

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

В какой-то момент очередь должна получить доступ . Лучшее место для этого - автономный RejectedExecutionHandler, который сохраняет любое дублирование кода или возможные ошибки, возникающие в результате прямого манипулирования очередью в области действия объекта пула. Обратите внимание, что обработчики, включенные в ThreadPoolExecutor, сами используют getQueue().

1 голос
/ 25 октября 2013

Это очень важный вопрос, если используемая вами очередь полностью отличается от стандартной встроенной памяти LinkedBlockingQueue или ArrayBlockingQueue.

Например, если вы реализуетешаблон потребителя, использующий несколько производителей на разных машинах и использующий механизм организации очередей, основанный на отдельной подсистеме персистентности (например, Redis), тогда вопрос становится актуальным сам по себе, даже если вы не хотите блокировать offer(), как OP.

Таким образом, данный ответ о том, что prestartAllCoreThreads() должен быть вызван (или достаточно раз prestartCoreThread()), чтобы рабочие потоки были доступны и работали, достаточно важен, чтобы его подчеркнуть.

0 голосов
/ 17 июня 2015

При необходимости мы также можем использовать парковку, которая отделяет основную обработку от отклоненных задач -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();
...