Также была поднята следующая опция (не уверен, что это должно быть ответом для меня, но не мог найти лучшего места, чтобы написать это):
Создать упаковщик для очереди.Эта оболочка будет иметь монитор, который будет ожидаться при чтении потребителем, и производитель будет получать уведомления при каждом добавлении нового объекта или поднятии флага isDone.
Когда потребитель читает объекты изВ очереди эти объекты будут обернуты чем-то похожим на то, что @ yann-ramin предлагал выше.Однако, чтобы уменьшить накладные расходы, потребитель будет предоставлять один, повторно используемый экземпляр QueueMessage при каждом вызове чтения (это всегда будет один и тот же экземпляр).Оболочка очереди обновит поля соответствующим образом перед возвратом экземпляра потребителю.
Это позволяет избежать использования тайм-аутов, снов и т. Д.
EDITED Это предлагаемыйреализация:
/**
* This work queue is designed to be used by ONE producer and ONE consumer
* (no more, no less of neither). The work queue has certain added features, such
* as the ability to signal that the workload generation is done and nothing will be
* added to the queue.
*
* @param <E>
*/
public class DefiniteWorkQueue<E> {
private final E[] EMPTY_E_ARRAY;
private LinkedBlockingQueue<E> underlyingQueue = new LinkedBlockingQueue<E>();
private boolean isDone = false;
// This monitor allows for flagging when a change was done.
private Object changeMonitor = new Object();
public DefiniteWorkQueue(Class<E> clazz) {
// Reuse this instance, makes calling toArray easier
EMPTY_E_ARRAY = (E[]) Array.newInstance(clazz, 0);
}
public boolean isDone() {
return isDone;
}
public void setIsDone() {
synchronized (changeMonitor) {
isDone = true;
changeMonitor.notifyAll();
}
}
public int size() {
return underlyingQueue.size();
}
public boolean isEmpty() {
return underlyingQueue.isEmpty();
}
public boolean contains(E o) {
return underlyingQueue.contains(o);
}
public Iterator<E> iterator() {
return underlyingQueue.iterator();
}
public E[] toArray() {
// The array we create is too small on purpose, the underlying
// queue will extend it as needed under a lock
return underlyingQueue.toArray(EMPTY_E_ARRAY);
}
public boolean add(E o) {
boolean retval;
synchronized (changeMonitor) {
retval = underlyingQueue.add(o);
if (retval)
changeMonitor.notifyAll();
}
return retval;
}
public boolean addAll(Collection<? extends E> c) {
boolean retval;
synchronized (changeMonitor) {
retval = underlyingQueue.addAll(c);
if (retval)
changeMonitor.notifyAll();
}
return retval;
}
public void remove(RemovalResponse<E> responseWrapper) throws InterruptedException {
synchronized (changeMonitor) {
// If there's nothing in the queue but it has not
// ended yet, wait for someone to add something.
if (isEmpty() && !isDone())
changeMonitor.wait();
// When we get here, we've been notified or
// the current underlying queue's state is already something
// we can respond about.
if (!isEmpty()) {
responseWrapper.type = ResponseType.ITEM;
responseWrapper.item = underlyingQueue.remove();
} else if (isDone()) {
responseWrapper.type = ResponseType.IS_DONE;
responseWrapper.item = null;
} else {
// This should not happen
throw new IllegalStateException(
"Unexpected state where a notification of change was made but " +
"nothing is in the queue and work is not done.");
}
}
}
public static class RemovalResponse<E> {
public enum ResponseType {
/**
* Used when the response contains the first item of the queue.
*/
ITEM,
/**
* Used when the work load is done and nothing new will arrive.
*/
IS_DONE
};
private ResponseType type;
private E item;
public ResponseType getType() {
return type;
}
public void setType(ResponseType type) {
this.type = type;
}
public E getItem() {
return item;
}
public void setItem(E item) {
this.item = item;
}
}
}