Реализовать ArrayBlockingQueue использовать алгоритм двухблокировки, такой как LinkedBlockingQueue? \ - PullRequest
0 голосов
/ 02 июля 2019

Я хочу знать, можно ли использовать массив для реализации алгоритма двухблокировки, например LinkedBlockingQueue.

Я получаю ответ типа

LinkedBlockingQueue take() и put() метод обрабатывает отдельные узлы (заголовок и последний), поэтому он может использовать две блокировки, но ArrayBlockingQueue использует общий массив (items[]), take() и put() метод должен обрабатывать один и тот же массив (items[]), поэтому у него есть параллельная проблема , необходимо использовать одну блокировку.

Я сомневаюсь в этом ответе, потому что я думаю, что в очереди массива, методы take() и put() могут обрабатывать разные индексы,он также может реализовывать использование алгоритма с двумя замками.

Поэтому я реализую свой метод DoubleLockArrayBlockingQueue, но только take() и put(), но я не знаю, может ли он работать должным образом.

public class DoubleLockArrayBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = 3414217016077849776L;

    final Object[] items;

    int takeIndex;

    int putIndex;

    AtomicInteger count;

    final ReentrantLock putLock;
    final Condition notFull;
    final ReentrantLock takeLock;
    final Condition notEmpty;

    public DoubleLockArrayBlockingQueue(int capacity) {
    if (capacity < 0) {
        throw new IllegalArgumentException();
    }
    this.items = new Object[capacity];
    count = new AtomicInteger(0);
    putLock = new ReentrantLock();
    notFull = putLock.newCondition();
    takeLock = new ReentrantLock();
    notEmpty = takeLock.newCondition();
    }


    @Override
    public void put(E e) throws InterruptedException {
    if (e == null) {
        throw new NullPointerException();
    }
    int c = -1;
    putLock.lockInterruptibly();
    try {
        while (count.get() == items.length) {
        notFull.await();
        }
        // 入队
        items[putIndex] = e;
        if (++putIndex == items.length) {
        putIndex = 0;
        }
        c = count.getAndIncrement();
        // 入队结束
        // 如果还有位置可以生产,那么通知被阻塞的生产者,putLock上锁期间,takeLock可以拿到,进行消费,可能还有位置
        if (c + 1 < items.length) {
        notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // 如果原来队列中为0,说明有可能有消费者被阻塞了,那么通知一下消费者
    if (c == 0) {
        takeLock.lock();
        try {
        notEmpty.signal();
        } finally {
        takeLock.unlock();
        }
    }
    }

    @Override
    public E take() throws InterruptedException {
    int c = -1;
    E r = null;
    takeLock.lock();
    try {
        while (count.get() == 0) {
        notEmpty.await();
        }
        // 出队
        r = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) {
        takeIndex = 0;
        }
        c = count.getAndDecrement();
        if (c > 1) {
        notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == items.length) {
        putLock.lock();
        try {
        notFull.signal();
        } finally {
        putLock.unlock();
        }
    }
    return r;
    }
}

Я могу запустить этот код, но трудно доказать, что он работает должным образом в нескольких потоках.Может кто-нибудь дать совет о том, правильно это или нет?или какой-то способ доказать это?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...