Я хочу знать, можно ли использовать массив для реализации алгоритма двухблокировки, например LinkedBlockingQueue
.
Я получаю ответ типа
LinkedBlockingQueue
take()
и put()
метод обрабатывает отдельные узлы (заголовок и последний), поэтому он может использовать две блокировки, но ArrayBlockingQueue
использует общий массив (items[]
), take()
и put()
метод должен обрабатывать один и тот же массив (items[]
), поэтому у него есть параллельная проблема , необходимо использовать одну блокировку.
Я сомневаюсь в этом ответе, потому что я думаю, что в очереди массива, методы take()
и put()
могут обрабатывать разные индексы,он также может реализовывать использование алгоритма с двумя замками.
Поэтому я реализую свой метод DoubleLockArrayBlockingQueue
, но только take()
и put()
, но я не знаю, может ли он работать должным образом.
public class DoubleLockArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = 3414217016077849776L;
final Object[] items;
int takeIndex;
int putIndex;
AtomicInteger count;
final ReentrantLock putLock;
final Condition notFull;
final ReentrantLock takeLock;
final Condition notEmpty;
public DoubleLockArrayBlockingQueue(int capacity) {
if (capacity < 0) {
throw new IllegalArgumentException();
}
this.items = new Object[capacity];
count = new AtomicInteger(0);
putLock = new ReentrantLock();
notFull = putLock.newCondition();
takeLock = new ReentrantLock();
notEmpty = takeLock.newCondition();
}
@Override
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
int c = -1;
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
notFull.await();
}
// 入队
items[putIndex] = e;
if (++putIndex == items.length) {
putIndex = 0;
}
c = count.getAndIncrement();
// 入队结束
// 如果还有位置可以生产,那么通知被阻塞的生产者,putLock上锁期间,takeLock可以拿到,进行消费,可能还有位置
if (c + 1 < items.length) {
notFull.signal();
}
} finally {
putLock.unlock();
}
// 如果原来队列中为0,说明有可能有消费者被阻塞了,那么通知一下消费者
if (c == 0) {
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
}
@Override
public E take() throws InterruptedException {
int c = -1;
E r = null;
takeLock.lock();
try {
while (count.get() == 0) {
notEmpty.await();
}
// 出队
r = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) {
takeIndex = 0;
}
c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == items.length) {
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
return r;
}
}
Я могу запустить этот код, но трудно доказать, что он работает должным образом в нескольких потоках.Может кто-нибудь дать совет о том, правильно это или нет?или какой-то способ доказать это?