Простой и наивный подход состоит в том, чтобы украсить вашу BlockingQueue реализацией, которая просто проверяет базовую очередь и затем отправляет задачу, чтобы сделать уведомление.
NotifyingQueue<T> extends ForwardingBlockingQueue<T> implements BlockingQueue<T> {
private final Notifier notifier; // injected not null
…
@Override public void put(T element) {
if (getDelegate().isEmpty()) {
notifier.notEmptyAnymore();
}
super.put(element);
}
@Override public T poll() {
final T result = super.poll();
if ((result != null) && getDelegate().isEmpty())
notifier.nowEmpty();
}
… etc
}
У этого подхода есть пара проблем. Несмотря на то, что empty -> notEmpty довольно прост - особенно для случая одного производителя, двум потребителям было бы легко работать одновременно, и оба увидели бы, что очередь идет из непустого -> empty.
Если, однако, все, что вам нужно, это получить уведомление о том, что очередь стала пустой в какое-то время , тогда этого будет достаточно, если ваш уведомитель - ваш конечный автомат, отслеживающий пустоту и незаполненность уведомление, когда оно меняется от одного к другому:
AtomicStateNotifier implements Notifier {
private final AtomicBoolean empty = new AtomicBoolean(true); // assume it starts empty
private final Notifier delegate; // injected not null
public void notEmptyAnymore() {
if (empty.get() && empty.compareAndSet(true, false))
delegate.notEmptyAnymore();
}
public void nowEmpty() {
if (!empty.get() && empty.compareAndSet(false, true))
delegate.nowEmpty();
}
}
Теперь это потокобезопасная защита вокруг реальной реализации Notifier, которая, возможно, отправляет задачи Исполнителю для асинхронной записи событий в базу данных.