Почему пользовательская очередь блокировки не является поточно-ориентированной в Java - PullRequest
0 голосов
/ 16 октября 2018

Я просто хочу создать очередь блокировки с помощью ReentrantLock, я определяю два условия full и empty, исходный код выглядит следующим образом:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();

    private Condition full = lock.newCondition();
    private Condition empty = lock.newCondition();

    private Integer maxLength = 1 << 4;

    private Integer putIndex = 0, takeIndex = 0;
    private Integer count = 0;

    private Object[] value;

    public BlockQueue(){
        value = new Object[maxLength];
    }

    public BlockQueue(Integer maxLength){
        this.maxLength = maxLength;
        value = new Object[maxLength];
    }

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            if (count.equals(maxLength)){
                log.info("The queue is full!");
                full.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            empty.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            if (count == 0){
                empty.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            full.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}

При тестировании в двух пользовательских потоках и одном потоке поставщика, count меньше нуля в какое-то случайное время.
Почему очередь блокировки не является поточно-ориентированной, кто может мне помочь, дав мне несколько советов?Большое спасибо!

Обновление (2018/10/17)

Если я просто использую один Condition, может ли он работать правильно?Исходный код выглядит следующим образом:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    ...

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            while (count.equals(maxLength)){
                log.info("The queue is full!");
                condition.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            while (count == 0){
                condition.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            condition.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}

Ответы [ 2 ]

0 голосов
/ 16 октября 2018

Одна очевидная вещь состоит в том, что Условие может «проснуться» без соответствующего вызова «сигнала».Таким образом, вместо использования «если», вам нужно использовать «пока».Например:

while (count == 0) {
    empty.await();
}

См. Также javadoc здесь: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html

Блокировка, связанная с этим условием, снимается атомарно, и текущий поток становится отключенным для целей планирования потока и лежитнеактивен, пока не произойдет одно из четырех:

  • Какой-то другой поток вызывает метод signal () для этого условия, и текущий поток выбирается в качестве потока, который должен быть пробужден;или
  • Какой-то другой поток вызывает метод signalAll () для этого условия;или
  • Некоторый другой поток прерывает текущий поток, и поддерживается прерывание приостановки потока;или
  • Произошло "ложное пробуждение".
0 голосов
/ 16 октября 2018

Аналогичный вопрос: Почему wait () всегда вызывается внутри цикла

Объяснение

Рассмотрим следующую ситуацию:

  1. потребитель 1 заблокирован на lock.lock();
  2. потребитель 2 заблокирован на empty.await();.
  3. производитель держитблокировка и добавляет один элемент в очередь, что делает count = 1 и вызывает empty.signal();.
  4. потребитель 2 получает этот сигнал и выходит из empty.await();, ему нужно повторнополучить замок, в то время как cosumer 1 впереди него.
  5. cosumer 1 получает блокировку и находит, что счетчик равен 1, поэтому он уменьшает счетчик до 0.
  6. cosumer 2 получает блокировку, поскольку она выполнила

    if (count == 0){    <--- consumer 2 will not re-check this condition
        empty.await();  
    }
    

    cosumer 2 считает, что очередь не пуста, затем онавыполняет:

    takeIndex = takeIndex % maxLength;
    val = value[takeIndex++];
    count--;
    

    , что уменьшает счет до 0.

Решение

Использование while вместо if гарантирует, что потребитель 2 будетперепроверить, пуста ли очередьу, который гарантирует count >= 0.

while (count == 0){
    empty.await();
}

также, лучше сделать то же самое с методом продукта:

while (count.equals(maxLength)){
    log.info("The queue is full!");
    full.await();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...