Очередь одновременного набора - PullRequest
33 голосов
/ 25 июня 2010

Возможно, это глупый вопрос, но я не могу найти очевидного ответа.

Мне нужна параллельная очередь FIFO, которая содержит только уникальные значения.Попытка добавить значение, которое уже существует в очереди, просто игнорирует это значение.Что, если бы не поток, безопасность была бы тривиальной.Есть ли в Java структура данных или, возможно, фрагмент кода на межплетениях, демонстрирующий такое поведение?

Ответы [ 7 ]

6 голосов
/ 26 июня 2010

Если вам нужен лучший параллелизм, чем полная синхронизация, я знаю один способ сделать это, используя ConcurrentHashMap в качестве вспомогательной карты.Ниже приведен только набросок.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

При таком подходе не все хорошо.У нас нет приличного способа выбрать элемент головы, кроме использования entrySet().iterator().next() карты поддержки, в результате чего со временем карта становится все более и более несбалансированной.Этот дисбаланс является проблемой как из-за больших коллизий сегментов памяти, так и из-за большей конкуренции за сегменты.

Примечание: этот код использует Guava в нескольких местах.

5 голосов
/ 25 июня 2010

Нет встроенной коллекции, которая делает это.Существует несколько одновременных Set реализаций, которые можно использовать вместе с одновременными Queue.

Например, элемент добавляется в очередь только после того, как он был успешно добавлен в набор, и каждый элемент удалениз очереди удаляется из набора.В этом случае содержимое очереди, по логике, действительно является тем, что находится в наборе, и очередь просто используется для отслеживания порядка и обеспечения эффективных операций take() и poll(), найденных только на BlockingQueue.

4 голосов
/ 26 июня 2010

Я бы использовал синхронизированный LinkedHashSet, пока не было достаточно оснований для рассмотрения альтернатив. Основное преимущество, которое может предложить более параллельное решение, - это разделение блокировки.

Простейшим параллельным подходом будет ConcurrentHashMap (действующий как набор) и ConcurrentLinkedQueue. Порядок операций обеспечит желаемое ограничение. Метод offer () сначала выполняет CHM # putIfAbsent () и в случае успешной вставки в CLQ. Опрос () будет взят из CLQ и затем удален из CHM. Это означает, что мы рассматриваем запись в нашей очереди, если она находится на карте, а CLQ обеспечивает порядок. Производительность может быть скорректирована путем увеличения уровня параллелизма карты. Если вы терпимы к дополнительной скорости, то дешевый CHM # get () может служить разумным предварительным условием (но он может пострадать из-за слегка устаревшего представления).

4 голосов
/ 25 июня 2010

A java.util.concurrent.ConcurrentLinkedQueue поможет вам в этом.

Оберните ConcurrentLinkedQueue вашим собственным классом, который проверяет уникальность добавления. Ваш код должен быть потокобезопасным.

2 голосов
/ 05 июля 2010

Что вы подразумеваете под параллельной очередью с установленной семантикой? Если вы имеете в виду действительно параллельную структуру (в отличие от поточно-безопасной структуры), то я бы сказал, что вы просите пони.

Что произойдет, например, если вы позвоните put(element) и обнаружите, что там уже есть что-то, что немедленно удаляется? Например, что это значит в вашем случае, если offer(element) || queue.contains(element) возвращает false?

О таких вещах часто нужно думать немного по-другому в параллельном мире, так как часто ничто не так, как кажется, если вы не остановите мир (заблокируйте его). В противном случае вы обычно смотрите на что-то в прошлом. Итак, что вы на самом деле пытаетесь сделать?

0 голосов
/ 06 июля 2016

Простой ответ для очереди уникальных объектов может быть следующим:

import java.util.concurrent.ConcurrentLinkedQueue;

public class FinalQueue {

    class Bin {
        private int a;
        private int b;

        public Bin(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public int hashCode() {
            return a * b;
        }

        public String toString() {
            return a + ":" + b;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Bin other = (Bin) obj;
            if ((a != other.a) || (b != other.b))
                return false;
            return true;
        }
    }

    private ConcurrentLinkedQueue<Bin> queue;

    public FinalQueue() {
        queue = new ConcurrentLinkedQueue<Bin>();
    }

    public synchronized void enqueue(Bin ipAddress) {
        if (!queue.contains(ipAddress))
            queue.add(ipAddress);
    }

    public Bin dequeue() {
        return queue.poll();
    }

    public String toString() {
        return "" + queue;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        FinalQueue queue = new FinalQueue();
        Bin a = queue.new Bin(2,6);

        queue.enqueue(a);
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(14, 3));
        queue.enqueue(queue.new Bin(13, 9));
        queue.enqueue(queue.new Bin(18, 3));
        queue.enqueue(queue.new Bin(14, 7));
        Bin x= queue.dequeue();
        System.out.println(x.a);
        System.out.println(queue.toString());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println(queue.toString());
    }
}
0 голосов
/ 25 июля 2014

Возможно расширение ArrayBlockingQueue . Чтобы получить доступ к блокировке (package-access), я должен был поместить свой подкласс в тот же пакет. Предостережение: я не проверял это.

package java.util.concurrent;

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> {

    public DeDupingBlockingQueue(int capacity) {
        super(capacity);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair) {
        super(capacity, fair);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        super(capacity, fair, c);
    }

    @Override
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return false;
            return super.add(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //Should this be lock.lock() instead?
        try {
            if (contains(e)) return;
            super.put(e); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }
}
...