Как я могу сделать Java PriorityBlockingQueue, которая сохраняет поведение FIFO? - PullRequest
7 голосов
/ 08 июля 2011

Я пытаюсь создать очередь блокировки приоритетов в Java, которая поддерживает порядок FIFO для элементов с таким же приоритетом.В этом вам поможет документ Oracle, но я все еще запутался.

Следует отметить, что следующие темы для меня очень новы: Обобщения , Интерфейсы как типы и статические вложенные классы .Все они вступают в игру в следующем определении класса.Обобщения, в частности, сбивают с толку, и я уверен, что я полностью напутал с ними здесь.

Я включил комментарии, чтобы идентифицировать ошибки компилятора, которые я получаю в настоящее время.вопросы:

  1. Можно ли, чтобы класс представлял объект события в очереди, а фактическая очередь была статическим членом класса?

  2. Разумно ли включать Oracle FIFO-событие «обертку» в качестве статического вложенного класса?

  3. Я хотя бы на правильном пути, делая все это в одном внешнем классе?

Вот класс, который я написал:

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class FIFOPBQEvent {

/**
 * First we define a static nested class which, when instantiated,
 * encapsulates the "guts" of the event - a FIFOPBQEvent - along with
 * a sequence number that assures FIFO behavior of events of like priority.
 *  
 * The following is lifted ALMOST verbatim (I added "static" and some 
 * comments) from Oracle documentation on adding FIFO-ness to a Priority
 * Blocking Queue: 
 * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/PriorityBlockingQueue.html
 * As the Oracle doc points out:
 * 
 * "A static nested class interacts with the instance members of its outer 
 * class (and other classes) just like any other top-level class. In 
 * effect, a static nested class is behaviorally a top-level class that 
 * has been nested in another top-level class for packaging convenience."
 *
 */
static class FIFOEntry<E extends Comparable<? super E>> implements
        Comparable<FIFOEntry<E>> {
    final static AtomicLong seq = new AtomicLong();
    final long seqNum;  // instance
    final E entry;

    public FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    public E getEntry() {
        return entry;
    }
    /** Here is implementation of Comparable */
    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry);
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1);
        return res;
    }
}

/**
 * Now we declare a single (static) PBQ of FIFO entries into which 
 * PBQFIFOEvents will be added and removed.
 */

/** FLAGGED AS ERROR BY COMPILER */
// Bound mismatch: The type FIFOPBQEvent is not a valid substitute for the
// bounded parameter <E extends Comparable<? super E>> of the type 
// FIFOPBQEvent.FIFOEntry<E>

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
    PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

/** 
 * And here are the "guts" of our event: the i.d. and state of the GUI widget 
 */
private ConsoleObject obj = ConsoleObject.UNDEFINED_OBJ; // widget that was affected
private ObjectState state = ObjectState.UNDEFINED_STATE; // the widget's new state

/** 
 * Constructor specifying the class variables 
 */
public FIFOPBQEvent(ConsoleObject theObj, ObjectState theState) {
    obj = theObj;
    state = theState;
}

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // The method put(FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>) in the type 
    // PriorityBlockingQueue<FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>> is not 
    // applicable for the arguments (FIFOPBQEvent)

    theQueue.put(this);
}

public static FIFOPBQEvent receiveEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // Type mismatch: cannot convert from FIFOPBQEvent.FIFOEntry<FIFOPBQEvent> 
    // to FIFOPBQEvent

    FIFOPBQEvent event = theQueue.take();
    return event;
}

/**
 * ConsoleEvent accessors
 */
public ConsoleObject getObj() {
    return this.obj;
}

public ObjectState getState() {
    return this.state;
}

/**
 * And for the first time, enums instead of public static final ints.
 */
public enum ConsoleObject {
    UNDEFINED_OBJ,
    RESERVED,

    /** Console keys */
    RESET,
    DISPLAY_MAR,
    SAVE,
    INSERT,
    RELEASE,
    START,
    SIE,
    SCE,

    /** Console toggle switches */
    POWER,
    PARITY_CHECK,
    IO_CHECK,
    OVERFLOW_CHECK,
    SENSE_SWITCH_1,
    SENSE_SWITCH_2,
    SENSE_SWITCH_3,
    SENSE_SWITCH_4
}

public enum ObjectState {
    UNDEFINED_STATE,

    /** Toggle switches */
    OFF,
    ON,

    /** Console keys */
    PRESSED,
}
}

Ответы [ 4 ]

4 голосов
/ 08 июля 2011

Первая ошибка - более значимая ошибка. Это происходит потому, что класс FIFOPBQEvent не реализует Comparable, что должно рассматриваться как универсальный тип для вложенного класса FIFOEntry. Это потому, что вы ограничиваете E и говорите, что это extends Comparable<...>. По сути, ваш класс FIFOPBQEvent должен быть сопоставим, чтобы обеспечить приоритет для очереди (предположительно в зависимости от типа события).

Чтобы исправить ошибку, вам необходимо:

  1. Измените заголовок вашего класса на:

    public class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {
    
  2. добавить метод compareTo в класс FIFOPBQEvent; что-то вроде:

    public int compareTo (FIFOPBQEvent other) {
        // TODO - compare this and other for priority
        return 0;
    }
    

Затем вам нужно обернуть вашу запись в ваш sendEvent метод:

public void sendEvent () {
    theQueue.put(new FIFOEntry<FIFOPBQEvent> (this));
}

Последняя незначительная ошибка заключается в том, что вы не развернули объект FIFOEntry. Чтобы это исправить, измените receiveEvent на:

public static FIFOPBQEvent receiveEvent () {
    FIFOPBQEvent event = theQueue.take ().getEntry ();
    return event;
}
2 голосов
/ 08 июля 2011

Давайте пройдемся по вашему коду.

static class FIFOEntry<E extends Comparable<? super E>> implements 
  Comparable<FIFOEntry<E>> {

Это определяет класс FIFOEntry, который принимает универсальный параметр.Вы ограничили тип универсального параметра «Любой объект, который реализует Comparable сам по себе».

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
  PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

Ваше объявление PriorityBlockingQueue здесь не является неправильным, но ваше определение FIFOEntry<FIFOPBQEvent> неверно.Это из-за вышеупомянутого пункта - вы ограничили тип FIFOEntry всем, что реализует Comparable, то есть должно быть

class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {

Ваша следующая проблема -

public void sendEvent() {
  theQueue.put(this);
}

Тип this равен FIFOPBQEvent, но очередь принимает только объекты FIFOEntry.Чтобы соответствовать вашей подписи в очереди, она должна быть:

public void sendEvent() {
  // Requires FIFOPBQEvent to be Comparable
  theQueue.put(new FIFOEntry<FIFOPBQEvent>(this));     
}

У вас такая же проблема и на receiveEvent() - ваша подпись в очереди говорит, что очередь содержит объекты FIFOEntry, и вы пытаетесь извлечь FIFOPBQEvents.

1 голос
/ 19 декабря 2012

Вот фактическая замена для PriorityBlockingQueue, которая поддерживает порядок FIFO для элементов с равным приоритетом. Это делает всю упаковку / распаковку прозрачной для пользователя.

Этот код был написан для 1.4 JVM и использует j.u.c. портировать. Использовать его в более новой JVM и добавлять дженерики должно быть просто.

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.PriorityBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;

/**
 * A queue with all properties of a {@link PriorityBlockingQueue} but which additionally 
 * returns items with equal priority 
 * in a FIFO order.
 * In this respect, {@link PriorityBlockingQueue} explicitly gives no return order guarantees for equal priority elements.
 * 
 * This queue only accepts {@link Comparable} items. A custom {@link Comparator} cannot
 * be specified at construction time.
 * 
 *
 */
public final class FIFOPriorityBlockingQueue implements BlockingQueue {
    private final PriorityBlockingQueue q;

    public FIFOPriorityBlockingQueue() {
        q = new PriorityBlockingQueue();
    }

    public FIFOPriorityBlockingQueue(int initialCapacity) {
        q = new PriorityBlockingQueue(initialCapacity);
    }

    public boolean isEmpty() {
        return q.isEmpty();
    }

    public boolean addAll(Collection c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    Iterator e = c.iterator();
    while (e.hasNext()) {
        if (add(e.next()))
        modified = true;
    }
    return modified;
    }

    /**
     * Always returns <code>null</code> as this {@link BlockingQueue} only accepts
     * {@link Comparable} objects and doesn't allow setting an own {@link Comparator} 
     * @return
     */
    public Comparator comparator() {
        return null;
    }

    public boolean containsAll(Collection c) {
        Iterator e = c.iterator();
        while (e.hasNext())
            if(!contains(e.next()))
            return false;

        return true;
    }

    public int size() {
        return q.size();
    }

    public int remainingCapacity() {
        return q.remainingCapacity();
    }

    public boolean removeAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public boolean retainAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(!c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public Object remove() {
        return ((FIFOEntry)q.remove()).entry;
    }

    public Object element() {
        return ((FIFOEntry)q.element()).entry;
    }

    public boolean add(Object e) {
        return q.add(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e) {
        return q.offer(new FIFOEntry((Comparable)e));
    }

    public void put(Object e) {
        q.put(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e, long timeout, TimeUnit unit) {
        return q.offer(new FIFOEntry((Comparable)e), timeout, unit);
    }

    public Object poll() {
        return ((FIFOEntry)q.poll()).entry;
    }

    public Object take() throws InterruptedException {
        return ((FIFOEntry)q.take()).entry;
    }

    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
        return ((FIFOEntry)q.poll(timeout, unit)).entry;
    }

    public Object peek() {
        return ((FIFOEntry)q.peek()).entry;
    }

    /**
     * If more than one equal objects are held by the queue, remove() will
     * remove any one of them, not necessarily the first or last inserted.
     */
    public boolean remove(Object o) {
        return q.remove(new FIFOEntry((Comparable)o));
    }

    public boolean contains(Object o) {
        return q.contains(new FIFOEntry((Comparable)o));
    }

    public Object[] toArray() {
        Object[] a = q.toArray();
        for (int i = 0; i < a.length; i++) { // unpacking
            a[i] = ((FIFOEntry)a[i]).entry;
        }
        return a;
    }

    public String toString() {
        return q.toString(); // ok, because each FIFOEntry.toString returns the toString of the entry 
    }

    public int drainTo(Collection c) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }



    public int drainTo(Collection c, int maxElements) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp, maxElements);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }

    public void clear() {
        q.clear();
    }



    public Object[] toArray(Object[] a) {
        Object[] res = q.toArray(a);
        for (int i = 0; i < res.length; i++) { // unpacking
            res[i] = ((FIFOEntry)res[i]).entry;
        }
        return res;
    }



    public Iterator iterator() {
        final Iterator it = q.iterator();
        return new Iterator() {
            public void remove() throws UnsupportedOperationException, IllegalStateException, ConcurrentModificationException {
                it.remove();
            }

            public Object next() throws NoSuchElementException, ConcurrentModificationException {
                return ((FIFOEntry)it.next()).entry;
            }

            public boolean hasNext() {
                return it.hasNext();
            }
        };
    }

    public int hashCode() {
        return q.hashCode();
    }

    public boolean equals(Object obj) {
        return q.equals(obj);
    }


    /**
     * Wrapping class which adds creation ordering to a {@link Comparable} object.
     * Based on the code in the javadoc for {@link PriorityBlockingQueue}
     * 
     *
     */
    private static class FIFOEntry implements Comparable {
        private final static AtomicLong seq = new AtomicLong();
        private final long seqNum;
        private final Comparable entry;
        public FIFOEntry(Comparable entry) {
            seqNum = seq.getAndIncrement();
            this.entry = entry;
        }

        public int compareTo(Object other) {
            FIFOEntry o = (FIFOEntry) other;
            int res = entry.compareTo(o.entry);
            if (res == 0 && o.entry != this.entry)
                res = (seqNum < o.seqNum ? -1 : 1);
            return res;
        }
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((entry == null) ? 0 : entry.hashCode());
            return result;
        }
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            FIFOEntry other = (FIFOEntry) obj;
            if (entry == null) {
                if (other.entry != null)
                    return false;
            } else if (!entry.equals(other.entry))
                return false;
            return true;
        }

        public String toString() {
            return entry.toString();
        }
    }

}
1 голос
/ 09 июля 2011

Принимая рекомендацию @ 101100, я переработал дизайн, отделив очередь от событий. Это, кажется, делает его намного проще и легче для понимания (и повторного использования), но, к сожалению, мне все еще неясно некоторые концепции. Далее следует PriorityFIFOEventQueue (для краткости я опустил класс Event). И я отметил, где мне все еще нужна помощь:

package ibm1620;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * This class represents a Priority Blocking Queue of FIFO entries.  Instantiating
 * it creates a queue.  It provides instance methods for sending and receiving
 * entries, a.k.a. events of type E, on the queue.
 */

Следующее помечено с помощью диагностики: "Тип PriorityFIFOEventQueue должен реализовывать унаследованный абстрактный метод Comparable> .compareTo (PriorityFIFOEventQueue)"

Я почти уверен, что не хочу сравнивать очереди !! Все еще не уверен, что мне здесь нужно.

public class PriorityFIFOEventQueue<E extends Comparable<? super E>> implements Comparable<PriorityFIFOEventQueue<E>> {

/**
 * FIFOEntry is a static nested class that wraps an Entry and provides support for
 * keeping the Entries in FIFO sequence.
 */
private static class FIFOEntry<E> implements Comparable<FIFOEntry<E>> {

    /**
     * There's going to be one atomic seq per ... queue?  runtime?
     */

    final static AtomicLong seq = new AtomicLong();

    /**
     * FIFOEntry is simply an entry of type E, and a sequence number
     */
    final long seqNum; 
    final E    entry;

    /**
     * FIFOEntry() constructor
     */
    FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    /**
     * Accessor method for entry
     */
    E getEntry() {
        return entry;
    }

    /**
     * Here is implementation of Comparable that is called directly by PBQ.
     * We first invoke E's comparator which compares based on priority. 
     * If equal priority, order by sequence number (i.e. maintain FIFO).
     * */

В дальнейшем строка, содержащая «CompareTo» помечается, и диагностика Msgstr "Метод compareTo (E) не определен для типа E". Видимо, я не сказал компилятору что «другой» FIFOEntry реализует Comparable.

    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry); // priority-based compare
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1); // FIFO compare
        return res;
    }
}

/**
 * Here is the PriorityBlockingQueue of FIFOEntries
 */

private PriorityBlockingQueue<FIFOEntry<E>> theQueue = 
    new PriorityBlockingQueue<FIFOEntry<E>>();

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent(E event) {
    theQueue.put(new FIFOEntry<E>(event));
}

/**
 * pollEvent() 
 * Will remove and return a ConsoleEvent from the queue, or return
 * null if queue is empty.
 */
public E pollEvent() {
    E event = null;
    FIFOEntry<E> aFIFOEvent = theQueue.poll();
    if (aFIFOEvent != null) {
        event = aFIFOEvent.getEntry();
        say("pollEvent() -> "+event);
    }
    else {
    }
    return event;
}

/**
 * receiveEvent() 
 * Will block if queue is empty.  Takes a FIFOEvent off the queue,
 * unwraps it and returns the 'E' payload.
 * 
 * @return
 */
public E receiveEvent() {
    E event = null;
    try {
        FIFOEntry<E> aFIFOEvent = theQueue.take();
        if (aFIFOEvent != null) {
            event = aFIFOEvent.getEntry();
            say("receiveEvent() -> "+event);
        }

    } catch (InterruptedException e) {
        say("InterruptedException in receiveEvent()");
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return event;
}

// for console debugging
static void say(String s) {
    System.out.println("ConsoleEvent: " + s);
}

}
...