Блок ThreadPoolExecutor, когда очередь заполнена? - PullRequest
52 голосов
/ 10 августа 2010

Я пытаюсь выполнить множество задач, используя ThreadPoolExecutor.Ниже приведен гипотетический пример:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

Проблема в том, что я быстро получаю java.util.concurrent.RejectedExecutionException, поскольку количество задач превышает размер рабочей очереди.Однако желаемое поведение, которое я ищу, - это иметь блок основного потока, пока в очереди не будет места.Каков наилучший способ сделать это?

Ответы [ 6 ]

59 голосов
/ 19 августа 2010

В некоторых очень узких обстоятельствах вы можете реализовать java.util.concurrent.RejectedExecutionHandler, который делает то, что вам нужно.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Сейчас.Это очень плохая идея по следующим причинам

  • Она склонна к взаимоблокировке, поскольку все потоки в пуле могут умереть до того, как вещь, которую вы положили в очередь, станет видимой.Смягчить это, установив разумное время поддержки.
  • Задача не выполнена так, как может ожидать ваш исполнитель.Многие реализации executor оборачивают свои задачи в какой-то объект отслеживания перед выполнением.Посмотрите на ваш источник.
  • Добавление через getQueue () настоятельно не рекомендуется API и может быть запрещено в какой-то момент.

Практически всегда лучшая стратегияустановить ThreadPoolExecutor.CallerRunsPolicy, который будет ограничивать ваше приложение, выполняя задачу в потоке, который вызывает execute ().

Однако иногда стратегия блокировки со всеми присущими ей рисками действительно является тем, что вам нужно.Я бы сказал, что в этих условиях

  • У вас есть только один поток, вызывающий execute ()
  • Вы должны (или хотите) иметь очень маленькую длину очереди
  • Вам абсолютно необходимо ограничить количество потоков, выполняющих эту работу (обычно по внешним причинам), и стратегия запуска вызывающих программ может нарушить это.
  • Ваши задачи имеют непредсказуемый размер, поэтому при выполнении вызывающих вызовов может возникнуть голодесли пул на мгновение был занят четырьмя короткими задачами и ваш однопотоковый вызов execute застрял с большим.

Итак, как я уже сказал.Это редко нужно и может быть опасно, но вот, пожалуйста.

Удачи.

5 голосов
/ 16 мая 2018

Что вам нужно сделать, это обернуть ваш ThreadPoolExecutor в Executor, который явно ограничивает количество одновременно выполняемых операций внутри него:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

Вы можете настроить concurrentTasksLimit на threadPoolSize + queueSize вашего исполнителя-делегата, и это в значительной степени решит вашу проблему

5 голосов
/ 26 марта 2018

Вы можете использовать semaphore, чтобы заблокировать попадание потоков в пул.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Некоторые ошибки :

  • Используйте только этот шаблонс фиксированным пулом потоков.Очередь вряд ли будет часто заполняться, поэтому новые потоки не будут создаваться.Посмотрите java-документацию на ThreadPoolExecutor для получения более подробной информации: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html Есть способ обойти это, но это выходит за рамки этого ответа.
  • Размер очереди должен быть больше, чем размерколичество основных потоков.Если бы мы делали размер очереди 3, то в конечном итоге произошло бы следующее:

    • T0: все три потока работают, очередь пуста, разрешения отсутствуют.
    • T1: поток 1 завершает работу, освобождает разрешение.
    • T2: поток 1 опрашивает очередь на наличие новой работы, не находит ничего и ожидает .
    • T3: основной потокотправляет работу в пул, поток 1 начинает работу.

    Приведенный выше пример транслирует основной поток , блокирующий поток 1. Это может показаться небольшим периодом, но теперь умножьтечастота по дням и месяцам.Внезапно, короткие промежутки времени приводят к большой потере времени.

1 голос
/ 30 мая 2018

Вот что я закончил:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}
1 голос
/ 31 мая 2017

Вот мой фрагмент кода в этом случае:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool - это локальный экземпляр java.util.concurrent.ExecutorService, который уже был инициализирован.

0 голосов
/ 07 ноября 2017

Я решил эту проблему с помощью пользовательского RejectedExecutionHandler, который просто на некоторое время просто блокирует вызывающий поток, а затем снова пытается отправить задачу:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

Этот класс можно просто использовать в потоке.Исполнитель пула как RejectedExecutionHandler, как и любой другой.В этом примере:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

Единственным недостатком, который я вижу, является то, что вызывающий поток может быть заблокирован немного дольше, чем это строго необходимо (до 250 мс).Для многих краткосрочных задач, возможно, уменьшите время ожидания до 10 мс или около того.Более того, поскольку этот исполнитель фактически вызывается рекурсивно, очень долгое ожидание появления потока (часы) может привести к переполнению стека.

Тем не менее мне лично нравится этот метод.Он компактен, прост для понимания и хорошо работает.Я что-то упустил? 1009 *

...