Каков наилучший способ иметь ограниченную очередь с ScheduledThreadPoolExecutor? - PullRequest
14 голосов
/ 13 сентября 2011

Sun Java (1.6) ScheduledThreadPoolExecutor, которая является расширением ThreadPoolExecutor, внутренне использует реализацию DelayQueue, которая является неограниченной очередью .Что мне нужно, так это ScheduledThreadpoolExecutor с ограниченной очередью , то есть он имеет ограничение на задачи, накапливающиеся в очереди, так что, когда задачи в очереди превышают лимит, он начинает отклонять дальнейшие отправленные задачи и предотвращает JVMнехватка памяти.

Удивительно, но Google или stackoverflow не указали мне на какие-либо результаты, которые обсуждают эту проблему.Есть ли уже такая вещь, которую я пропускаю?Если нет, то как я могу реализовать ScheduledThreadpoolExecutor, чтобы обеспечить мне ожидаемые функциональные возможности наилучшим образом?

Ответы [ 5 ]

5 голосов
/ 14 сентября 2011

Как уже отмечали другие, не существует готового способа сделать это.Просто убедитесь, что вы пытаетесь использовать «состав» вместо «наследования».Создайте новый класс, который реализует необходимый интерфейс, и делегируйте базовый ScheduledThreadPoolExecutor, выполнив проверки согласно необходимым методам.

Вы также можете использовать технику, указанную в этом потоке с простой модификацией.Вместо использования Semaphore#acquire вы можете использовать Semaphore#tryAcquire и в зависимости от логического результата решить, нужно ли вам вызывать обработчик отклонения или нет.Если подумать, я лично чувствую, что авторы библиотеки упустили возможность напрямую создать подкласс для конкретного исполнителя, а не полагаться на композицию для создания «запланированной» оболочки над обычным исполнителем.

2 голосов
/ 14 сентября 2011

Как насчет обработки по-разному, то есть в зависимости от размера очереди задержать отправку задачи. Службы executor выставляют очередь через getQueue (). Вы можете вызывать size () для него, и в зависимости от того, какой предел вы планируете для размера очереди, вы можете либо начать отклонять задачи, либо начать откладывать выполнение задачи (увеличить запланированное время, сохраняя размер очереди как один из факторов ).

Все сказали, это опять-таки не лучшее решение; Просто к сведению, Java предоставляет очередь задержки для поддержки кражи работы.

1 голос
/ 25 января 2016

ScheduledThreadPoolExecutor не использует очередь как поле, а вместо этого вызывает getQueue.Но он вызывает super.getQueue, который является очередью из ThreadPoolExecutor.Вы можете использовать отражение, чтобы переопределить его так:

public class BoundedScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
  public BoundedScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, int queueCapacity) {
    super(corePoolSize, handler);
    setMaximumPoolSize(corePoolSize);
    setKeepAliveTime(0, TimeUnit.MILLISECONDS);
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity) {
            @Override
            public boolean add(Runnable r) {
                boolean added = offer(r);
                if (added) {
                    return added;
                } else {
                    getRejectedExecutionHandler().rejectedExecution(r, CrashingThreadPoolExecutor.this);
                    return false;
                }
            }
        };

    try {
        Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
        workQueueField.setAccessible(true);
        workQueueField.set(this, queue);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
  }
}
1 голос
/ 11 января 2012

Самый простой обходной путь - использовать запланированный исполнитель только для планирования задач, а не для их фактического выполнения. Планировщик должен явно проверять размер очереди исполнителя и отменять задачу, если очередь исполнителя превышает пороговое значение.

Другой вариант - проверить размер очереди ScheduledThreadPoolExecutor прямо в запланированном задании. Если очередь превышает пороговое значение, просто вернитесь немедленно. В этом случае задача будет выполнена мгновенно и удалена из очереди. Так что переполнения не произойдет.

0 голосов
/ 14 сентября 2011

Если вы действительно, действительно не хотите повторно реализовывать ScheduledThreadPoolExecutor, тогда вы можете расширить его и переопределить все методы schedule* и реализовать собственное ограничение задач.Это было бы довольно неприятно, хотя:

private final Object scheduleMonitor = new Object();

@Override
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) 
{
    if (command == null || unit == null)
        throw new NullPointerException();

    synchronized (scheduleMonitor)
    {                
        while (getQueue().size() >= MAX_QUEUE_SIZE)
        {
           scheduleMonitor.wait();
        }
        super.schedule(command, delay, unit);
    }
}

@Override
Runnable getTask() 
{
    final Runnable r = getTask();
    synchronized (scheduleMonitor)
    {
        scheduleMonitor.notify();
    }
    return r;
}

И повторить для:

  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

Обратите внимание, что это не помешает повторяющимся задачам превышать лимит очереди, оно будет блокировать только новые запланированные задачи.

Еще одно предостережение: я не проверяллюбые проблемы взаимоблокировки путем вызова super.schedule при удержании блокировки на scheduleMonitor ...

...