Элегантная реализация индикаторов длины очереди в ExecutorServices - PullRequest
32 голосов
/ 15 февраля 2010

Почему, ой, почему java.util.concurrent не предоставляет индикаторы длины очереди для ExecutorService с? Недавно я обнаружил, что делаю что-то вроде этого:

ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...

public void addTaskToQueue(Runnable runnable) {
    if (queueLength.get() < MAX_QUEUE_LENGTH) {
        queueLength.incrementAndGet(); // Increment queue when submitting task.
        queue.submit(new Runnable() {
            public void run() {
                runnable.run();
                queueLength.decrementAndGet(); // Decrement queue when task done.
            }
        });
    } else {
        // Trigger error: too long queue
    }
}

Что работает нормально, но ... Я думаю, это действительно должно быть реализовано как часть ExecutorService. Глупо и подвержено ошибкам носить с собой счетчик , отделенный от реальной очереди, длину которой должен указывать счетчик (напоминает мне о C-массивах). Но ExecutorService s получаются статическими фабричными методами, поэтому нет способа просто расширить отличный в другом случае однопоточный исполнитель и добавить счетчик очереди. Итак, что мне делать:

  1. Изобрести что-либо уже реализованное в JDK?
  2. Другое умное решение?

Ответы [ 2 ]

55 голосов
/ 15 февраля 2010

Есть более прямой путь:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
// add jobs
// ...
int size = executor.getQueue().size();

Хотя вы можете не использовать удобные методы создания Executor, а создавать непосредственно исполнителя, чтобы избавиться от приведения и, таким образом, быть уверенным, что исполнителем всегда будет ThreadPoolExecutor, даже если реализация Executors.newSingleThreadExecutor изменится однажды.

ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );

Это напрямую скопировано из Executors.newSingleThreadExecutor в JDK 1.6. LinkedBlockingQueue, который передается конструктору, на самом деле является тем самым объектом, который вы получите от getQueue.

3 голосов
/ 06 декабря 2016

Пока вы можете проверить размер очереди напрямую. Еще один способ обработки слишком длинной очереди - ограничить внутреннюю очередь.

public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                              5000L, TimeUnit.MILLISECONDS,
                              new ArrayBlockingQueue<Runnable>(queueSize, true));
}

Это приведет к RejectedExecutionExceptions (см. this ) при превышении лимита.

Если вы хотите избежать исключения, вызывающий поток может быть перехвачен для выполнения функции. См. этот SO вопрос для решения.

Ссылки

...