Как сделать блок метода submit () ThreadPoolExecutor, если он насыщен? - PullRequest
96 голосов
/ 04 января 2010

Я хочу создать ThreadPoolExecutor таким образом, чтобы при достижении максимального размера и заполнении очереди метод submit() блокировал при попытке добавления новых задач. Нужно ли реализовывать пользовательский RejectedExecutionHandler для этого или существует существующий способ сделать это с помощью стандартной библиотеки Java?

Ответы [ 19 ]

44 голосов
/ 04 января 2010

Одно из возможных решений, которые я только что нашел:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Есть ли другие решения? Я бы предпочел что-то на основе RejectedExecutionHandler, так как это кажется стандартным способом справиться с такими ситуациями.

28 голосов
/ 14 апреля 2011

Вы можете использовать ThreadPoolExecutor и блокировкуQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
12 голосов
/ 08 февраля 2011

Проверьте четыре варианта для этого: Создание NotifyingBlockingThreadPoolExecutor

12 голосов
/ 04 января 2010

Вы должны использовать CallerRunsPolicy, который выполняет отклоненную задачу в вызывающем потоке. Таким образом, он не может отправлять какие-либо новые задачи исполнителю до тех пор, пока эта задача не будет выполнена, и в этот момент появятся свободные потоки пула, или процесс повторится.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Из документов:

Отклоненные задания

Новые задачи, представленные в методе execute (java.lang.Runnable), будут отклонено, когда Исполнитель выключить, а также когда исполнитель использует конечные границы как для максимума потоки и объем рабочей очереди, и насыщен. В любом случае, метод execute вызывает RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) метод его RejectedExecutionHandler. четыре предопределенные политики обработчиков при условии:

  1. В стандартном ThreadPoolExecutor.AbortPolicy обработчик выбрасывает среду выполнения RejectedExecutionException при отказ.
  2. В ThreadPoolExecutor.CallerRunsPolicy, поток, который вызывает выполнение себя запускает задачу Это обеспечивает простой механизм обратной связи, который будет замедлить темпы, что новые задачи представленный.
  3. В ThreadPoolExecutor.DiscardPolicy, a задача, которая не может быть выполнена, просто отброшен.
  4. В ThreadPoolExecutor.DiscardOldestPolicy, если исполнитель не выключен, Задача во главе рабочей очереди упал, а затем повторная попытка (который может снова потерпеть неудачу, вызывая это повторить.)

Кроме того, обязательно используйте ограниченную очередь, такую ​​как ArrayBlockingQueue, при вызове конструктора ThreadPoolExecutor. В противном случае ничто не будет отклонено.

Редактировать: в ответ на ваш комментарий установите размер ArrayBlockingQueue равным максимальному размеру пула потоков и используйте AbortPolicy.

Редактировать 2: Хорошо, я вижу, к чему вы клоните. Как насчет этого: переопределить метод beforeExecute(), чтобы проверить, что getActiveCount() не превышает getMaximumPoolSize(), и если это произойдет, поспать и повторить попытку?

6 голосов
/ 24 ноября 2011

Ответ BoundedExecutor, цитируемый выше от Параллелизм Java на практике работает правильно, только если вы используете неограниченную очередь для Исполнителя или граница семафора не превышает размер очереди. Семафор является состоянием, разделяемым между отправляющим потоком и потоками в пуле, что позволяет насыщать исполнителя, даже если размер очереди

Использование CallerRunsPolicy допустимо только в том случае, если ваши задачи не выполняются вечно, и в этом случае ваш отправляющий поток останется в rejectedExecution навсегда, и плохая идея, если выполнение ваших задач занимает много времени, поскольку Поток не может отправлять новые задачи или делать что-либо еще, если он сам выполняет задачу.

Если это не приемлемо, тогда я предлагаю проверить размер ограниченной очереди исполнителя перед отправкой задачи. Если очередь заполнена, подождите немного, прежде чем пытаться отправить снова. Пропускная способность пострадает, но я полагаю, что это более простое решение, чем многие другие предлагаемые решения, и вам гарантировано, что ни одна задача не будет отклонена.

6 голосов
/ 04 января 2011

Hibernate имеет BlockPolicy, который прост и может делать то, что вы хотите:

См .: Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
5 голосов
/ 10 марта 2015

Я знаю, это взлом, но, на мой взгляд, самый чистый взлом среди предложенных здесь; -)

Поскольку ThreadPoolExecutor использует очередь предложения «предложение» вместо «положить», позволяет переопределить поведение «предложения» очереди блокировки:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

Я проверил это, и, кажется, работает. Реализация некоторой политики тайм-аута оставлена ​​в качестве упражнения для читателя.

3 голосов
/ 27 августа 2015

Следующий класс обтекает ThreadPoolExecutor и использует семафор для блокировки, после чего рабочая очередь заполнена:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

Этот класс-обертка основан на решении, приведенном Брайаном Гетцем в книге «Параллелизм Java на практике». Решение в книге принимает только два параметра конструктора: Executor и границу, используемую для семафора. Это показано в ответе от Fixpoint. Существует проблема с этим подходом: он может войти в состояние, когда потоки пула заняты, очередь заполнена, но семафор только что выпустил разрешение. (semaphore.release() в блоке finally). В этом состоянии новая задача может получить только что выпущенное разрешение, но она отклоняется, поскольку очередь задач заполнена. Конечно, это не то, что вы хотите; Вы хотите заблокировать в этом случае.

Чтобы решить эту проблему, мы должны использовать неограниченную очередь, как ясно упоминает JCiP. Семафор выступает в роли охранника, создавая эффект размера виртуальной очереди. Это имеет побочный эффект, заключающийся в том, что устройство может содержать maxPoolSize + virtualQueueSize + maxPoolSize задач. Это почему? Из-за semaphore.release() в блоке finally. Если все потоки пула вызывают этот оператор в одно и то же время, то освобождаются разрешения maxPoolSize, позволяющие войти в устройство одинаковому количеству задач. Если бы мы использовали ограниченную очередь, она все равно была бы заполнена, что приводило к отклонению задачи. Теперь, поскольку мы знаем, что это происходит только тогда, когда поток пула почти завершен, это не проблема. Мы знаем, что поток пула не будет блокироваться, поэтому вскоре задача будет взята из очереди.

Вы можете использовать ограниченную очередь, хотя. Просто убедитесь, что его размер равен virtualQueueSize + maxPoolSize. Большие размеры бесполезны, семафор будет препятствовать пропуску большего количества элементов. Меньшие размеры приведут к отклонению задач. Вероятность отклонения заданий увеличивается с уменьшением размера. Например, скажем, вы хотите ограниченного исполнителя с maxPoolSize = 2 и virtualQueueSize = 5. Затем возьмите семафор с 5 + 2 = 7 разрешениями и фактическим размером очереди 5 + 2 = 7. Реальное количество задач, которые могут быть в блоке, составляет 2 + 5 + 2 = 9. Когда исполнитель заполнен (5 задач в очереди, 2 в пуле потоков, поэтому доступно 0 разрешений) и ВСЕ потоки пула освобождают свои разрешения, тогда входящие задачи могут получить ровно 2 разрешения.

Теперь решение от JCiP несколько громоздко в использовании, поскольку оно не обеспечивает выполнение всех этих ограничений (неограниченная очередь, или ограничение с этими математическими ограничениями и т. Д.). Я думаю, что это служит хорошим примером для демонстрации того, как вы можете создавать новые классы, ориентированные на многопотоковое исполнение, на основе уже доступных частей, но не как полноценный, повторно используемый класс. Я не думаю, что последний был намерением автора.

2 голосов
/ 14 января 2016

вы можете использовать пользовательский RejectedExecutionHandler, как этот

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });
0 голосов
/ 31 июля 2015

В прошлом у меня была такая же потребность: своего рода блокировка очереди с фиксированным размером для каждого клиента, поддерживаемого общим пулом потоков.В итоге я написал собственный вид ThreadPoolExecutor:

UserThreadPoolExecutor (блокирующая очередь (для каждого клиента) + пул потоков (общий для всех клиентов))

См .: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Каждому UserThreadPoolExecutor предоставляется максимальное количество потоков от общего ThreadPoolExecutor

Каждый UserThreadPoolExecutor может:

  • отправить задачу исполнителю общего пула потоков, если его квота не достигнута.Если его квота достигнута, задание ставится в очередь (непотребительная блокировка, ожидающая ЦП).Как только одна из отправленных задач завершена, квота уменьшается, позволяя другой задаче, ожидающей отправки, ThreadPoolExecutor
  • дождаться завершения оставшихся задач
...