Очередь с уведомлениями об изменениях isEmpty () - PullRequest
1 голос
/ 31 января 2011

У меня есть BlockingQueue<Runnable> (взято из ScheduledThreadPoolExecutor) в среде производителя-потребителя. Существует один поток, добавляющий задачи в очередь, и пул потоков, выполняющий их.

Мне нужны уведомления о двух событиях:

  1. Первый элемент добавлен в пустую очередь
  2. Последний элемент удален из очереди

Уведомление = запись сообщения в базу данных.

Есть ли какой-нибудь разумный способ реализовать это?

Ответы [ 2 ]

1 голос
/ 01 февраля 2011

Простой и наивный подход состоит в том, чтобы украсить вашу BlockingQueue реализацией, которая просто проверяет базовую очередь и затем отправляет задачу, чтобы сделать уведомление.

NotifyingQueue<T> extends ForwardingBlockingQueue<T> implements BlockingQueue<T> {
  private final Notifier notifier; // injected not null

  …

  @Override public void put(T element) {
    if (getDelegate().isEmpty()) {
      notifier.notEmptyAnymore();
    }
    super.put(element);
  }

  @Override public T poll() {
    final T result = super.poll();
    if ((result != null) && getDelegate().isEmpty())
      notifier.nowEmpty();
  }
  … etc
}

У этого подхода есть пара проблем. Несмотря на то, что empty -> notEmpty довольно прост - особенно для случая одного производителя, двум потребителям было бы легко работать одновременно, и оба увидели бы, что очередь идет из непустого -> empty.

Если, однако, все, что вам нужно, это получить уведомление о том, что очередь стала пустой в какое-то время , тогда этого будет достаточно, если ваш уведомитель - ваш конечный автомат, отслеживающий пустоту и незаполненность уведомление, когда оно меняется от одного к другому:

AtomicStateNotifier implements Notifier {
  private final AtomicBoolean empty = new AtomicBoolean(true); // assume it starts empty
  private final Notifier delegate; // injected not null

  public void notEmptyAnymore() {
    if (empty.get() && empty.compareAndSet(true, false))
      delegate.notEmptyAnymore();
  }

  public void nowEmpty() {
    if (!empty.get() && empty.compareAndSet(false, true))
      delegate.nowEmpty();
  }
}

Теперь это потокобезопасная защита вокруг реальной реализации Notifier, которая, возможно, отправляет задачи Исполнителю для асинхронной записи событий в базу данных.

0 голосов
/ 31 января 2011

Скорее всего, дизайн ошибочен, но вы можете сделать это относительно просто: у вас есть добавление одного потока, так что вы можете проверить перед добавлением.то есть pool.getQueue().isEmpty() - с одним производителем, это безопасно.

Последний удаленный элемент не может быть гарантирован, но вы можете переопределить beforeExecute и снова проверить очередь.Возможно, с небольшим таймаутом после того, как isEmpty() вернет истину.Возможно, приведенный ниже код будет лучше выполнять в afterExecute.

protected void beforeExecute(Thread t, Runnable r) { 
  if (getQueue().isEmpty()){
    try{
      Runnable r = getQueue().poll(200, TimeUnit.MILLISECONDS);
      if (r!=null){ 
        execute(r);
      }  else{
        //last message - or on after execute by Setting a threadLocal and check it there
        //alternatively you may need to do so ONLY in after execute, depending on your needs
     }
    }catch(InterruptedException _ie){
      Thread.currentThread().interrupt();
    }
  }
}

иногда так

Я могу объяснить, почему выполнение уведомлений без самой очереди не будет работать: представьте, что выдобавьте задачу, которая будет выполняться пулом, задача запланирована немедленно, очередь снова пуста, и вам потребуется уведомление.

...