Ограниченный приоритетBlockingQueue - PullRequest
16 голосов
/ 26 февраля 2010

PriorityBlockingQueue не ограничен, но мне нужно как-то его ограничить. Каков наилучший способ достичь этого?

Для информации, ограниченный PriorityBlockingQueue будет использоваться в ThreadPoolExecutor.

NB. По ограничению я не хочу выбрасывать Exception, если это произойдет, я хочу поместить объект в очередь, а затем вырезать его на основе значения приоритета. Есть ли какой-нибудь хороший способ сделать эту штуку?

Ответы [ 9 ]

12 голосов
/ 05 марта 2010

Я бы на самом деле не делал это.Хотя я не могу собрать пример кода прямо сейчас, я бы предложил версию шаблона декоратора.

Создайте новый класс и реализуйте интерфейсы, реализованные вашим классом интересов: PriorityBlockingQueue .Я обнаружил следующие интерфейсы, используемые этим классом:

Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>

В конструкторе для класса примите PriorityBlockingQueue в качестве параметра конструктора.

Затем реализуйте все методы, требуемые интерфейсами, через экземпляры PriorityblockingQueue.Добавьте любой код, необходимый, чтобы сделать его ограниченным.

Вот краткая диаграмма, которую я собрал в Фиолетовый UML :

Диаграмма классов BoundedPriorityblockingQueue http://theopensourceu.com/wp-content/uploads/2010/03/BoundedPriorityBlockingQueue.png

5 голосов
/ 29 сентября 2011

Существует реализация этого в библиотеке Google Collections / Guava : MinMaxPriorityQueue .

Для очереди с минимальным и максимальным приоритетом можно указать максимальный размер. Если так, каждый раз, когда размер очереди превышает это значение, очередь автоматически удаляет его самый большой элемент в соответствии с его компаратором (который может быть элементом, который был только что добавлен). Это разные из обычных ограниченных очередей, которые либо блокируют, либо отклоняют новые элементы при заполнении.

2 голосов
/ 30 июня 2011

После реализации BoundedPriorityBlockingQueue в соответствии с предложением Фрэнка V я понял, что это не совсем то, что я хотел. Основная проблема заключается в том, что элемент, который я должен вставить в очередь, может иметь более высокий приоритет, чем все, что уже есть в очереди. Таким образом, что мне действительно нужно, так это метод 'pivot', если я помещаю объект в очередь, когда очередь заполнена, я хочу вернуть объект с самым низким приоритетом, а не блокировать.

Чтобы конкретизировать предложения Фрэнка V, я использовал следующие фрагменты ...

public class BoundedPriorityBlockingQueue<E> 
   implements 
     Serializable, 
     Iterable<E>, 
     Collection<E>, 
     BlockingQueue<E>, 
     Queue<E>, 
     InstrumentedQueue 
{

... закрытый финальный замок ReentrantLock; // = new ReentrantLock (); частный финал Условие notFull;

final private int capacity;
final private PriorityBlockingQueue<E> queue;

public BoundedPriorityBlockingQueue(int capacity) 
  throws IllegalArgumentException, 
         NoSuchFieldException, 
         IllegalAccessException 
{
   if (capacity < 1) throw 
       new IllegalArgumentException("capacity must be greater than zero");      
   this.capacity = capacity;
   this.queue = new PriorityBlockingQueue<E>();

   // gaining access to private field
   Field reqField;
   try {
    reqField = PriorityBlockingQueue.class.getDeclaredField("lock");
    reqField.setAccessible(true);
    this.lock = (ReentrantLock)reqField.get(ReentrantLock.class);
    this.notFull = this.lock.newCondition();

   } catch (SecurityException ex) {
    ex.printStackTrace();
    throw ex;
   } catch (NoSuchFieldException ex) {
    ex.printStackTrace();
    throw ex;
   } catch (IllegalAccessException ex) {
    ex.printStackTrace();
    throw ex;
   }

...

@Override
public boolean offer(E e) {
    this.lock.lock();
    try {
        while (this.size() == this.capacity)
            notFull.await();
        boolean success = this.queue.offer(e);
        return success;
    } catch (InterruptedException ie) {
        notFull.signal(); // propagate to a non-interrupted thread
        return false;

    } finally {
        this.lock.unlock();
    }
}
...

Здесь также есть некоторые инструменты, чтобы я мог проверить эффективность очереди. Я все еще работаю над PivotPriorityBlockingQueue, если кому-то интересно, я могу опубликовать его.

2 голосов
/ 26 февраля 2010

Наверху моей головы я бы разбил его на подклассы и переписал метод put для обеспечения этого. Если все закончится, бросьте исключение или сделайте то, что кажется подходящим.

Что-то вроде:

public class LimitedPBQ extends PriorityBlockingQueue {

    private int maxItems;
    public LimitedPBQ(int maxItems){
        this.maxItems = maxItems;
    }

    @Override
    public boolean offer(Object e) {
        boolean success = super.offer(e);
        if(!success){
            return false;
        } else if (this.size()>maxItems){
            // Need to drop last item in queue
            // The array is not guaranteed to be in order, 
            // so you should sort it to be sure, even though Sun's Java 6 
            // version will return it in order
            this.remove(this.toArray()[this.size()-1]);
        }
        return true;
    }
}

Редактировать: добавить и поставить предложение вызова, поэтому переопределения должно быть достаточно

Редактировать 2: теперь следует удалить последний элемент, если он превышает maxItems. Хотя может быть более элегантный способ сделать это.

0 голосов
/ 27 октября 2015

Существует реализация в репозитория ConcurrencyUtils .

0 голосов
/ 17 июля 2015

Ни один ответ пока не обладает всеми следующими свойствами:

  • Реализует интерфейс BlockingQueue.
  • Поддерживает удаление абсолютного наибольшего значения.
  • Нет условий гонки.

К сожалению, в стандартной библиотеке Java нет реализации BlockingQueue. Вам нужно будет либо найти реализацию, либо реализовать что-то самостоятельно. Реализация BlockingQueue потребует определенных знаний о правильной блокировке.

Вот что я предлагаю: посмотрите на https://gist.github.com/JensRantil/30f812dd237039257a3d и используйте его в качестве шаблона для реализации вашей собственной оболочки вокруг SortedSet. По сути, все блокировки есть, и есть несколько модульных тестов (которые требуют некоторой настройки).

0 голосов
/ 03 сентября 2012

Существует еще одна реализация здесь

Кажется, она выполняет то, что вы просите:

BoundedPriorityQueue реализует очередь с приоритетами с верхней границей числаэлементы.Если очередь не заполнена, добавленные элементы всегда добавляются.Если очередь заполнена, а добавленный элемент больше, чем самый маленький элемент в очереди, самый маленький элемент удаляется, а новый элемент добавляется.Если очередь заполнена и добавленный элемент не превышает наименьший элемент в очереди, новый элемент не добавляется.

0 голосов
/ 06 марта 2010

Посмотрите на ForwardingQueue из API коллекций Google. Для блокирования семантики вы можете использовать Семафор .

0 голосов
/ 05 марта 2010

Если порядок Runnables, который вы хотите выполнить, не является строгим (как: может случиться так, что некоторые задачи с более низким приоритетом выполняются, даже если существуют задачи с более высоким приоритетом), тогда я бы предложил следующее, которое сводится к периодическому урезание PriorityQueue в размере:

if (queue.size() > MIN_RETAIN * 2){
    ArrayList<T> toRetain = new ArrayList<T>(MIN_RETAIN);
    queue.drainTo(toRetain, MIN_RETAIN);
    queue.clear();
    for (T t : toRetain){
      queue.offer(t);
    }
}

Это, очевидно, не удастся, если порядок должен быть строгим, так как слив приведет к моменту, когда задача с низким приоритетом будет извлечена из очереди с использованием одновременного доступа.

Преимущества заключаются в том, что это поточно-ориентированный и, скорее всего, будет работать так же быстро, как вы можете с дизайном очереди с приоритетами.

...