Где проблема с этим классом BoundedBuffer? - PullRequest
1 голос
/ 11 сентября 2011

Свойства класса ограниченного буфера, который я пытаюсь построить ...

  • Несколько производителей, несколько потребителей.
  • Блокировка производителя и блокировка потребителя.
  • В качестве указателей чтения / записи используется AtomicInteger.
  • Использует AtomicReferenceArray (принимает универсальный тип) для хранения буфера.
  • Буфер имеет размер Short.MAX_VALUE и использует CAS для устранения переполнений.

Теперь о проблеме ...

Проблема: Я не могу комментировать синхронизированные (это) блоки в приведенном ниже коде. Я думал, что смысл использования AtomicInteger в качестве указателей состоит в том, чтобы избежать этого.

Комментирование выходов синхронизированного (этого) блока приводит к тому, что потребители пропускают некоторые элементы, вставленные производителями. Если я включу блок синхронизированного (этого), все будет отлично, и каждая полученная вещь будет потреблена.

Чего мне не хватает?

public class BoundedBuffer<T> {
    private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
    private AtomicReferenceArray<T> m_buffer = null;
    private Semaphore m_full = new Semaphore(BUFFER_SIZE);
    private Semaphore m_empty = new Semaphore(0);
    private AtomicInteger m_writePointer = new AtomicInteger();
    private AtomicInteger m_readPointer = new AtomicInteger();

    public BoundedBuffer() {
        m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
    }

    public static int safeGetAndIncrement(AtomicInteger i) {
        int oldValue = 0, newValue = 0;
        do {
            oldValue = i.get();
            newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
        } while (!i.compareAndSet(oldValue, newValue));
        return oldValue;
    }

    public void add(T data) throws InterruptedException {
        m_full.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            m_buffer.set(safeGetAndIncrement(m_writePointer),data);
        }
        m_empty.release();
    }

    public T get() throws InterruptedException {
        T data = null;
        m_empty.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            data = m_buffer.get(safeGetAndIncrement(m_readPointer));
        }
        m_full.release();
        return data;
    }
}

1 Ответ

1 голос
/ 11 сентября 2011

Возможна проблема, когда get () из массива не является атомарным с приращением при удалении синхронизированного блока.Сценарий прерывания, о котором я размышляю, требует, чтобы производитель перегружал потребителей, тогда вы могли бы сделать так, чтобы производитель перезаписал запись массива, которая еще не была прочитана, ЕСЛИ выпуск семафора был вызван чтением не по порядку.

Рассмотрим ситуацию, когда буфер заполнен (индекс записывающего устройства равен N, индекс считывающего устройства равен N + 1), и два потока пытаются выполнить чтение из буфера.(Предположим, что N не близко к точке обтекания для простоты.)

Поток 1 получает индекс N + 1 для чтения своего элемента.

Поток 2 получает индекс N + 2из которого нужно прочитать свой элемент.

Из-за случайного планирования поток 2 сначала получает из буферного массива и освобождает семафор m_full до того, как поток 1 получает свой элемент из массива.

Поток 3 (производитель) просыпается и записывает элемент в следующий доступный слот N + 1 в буфере, также до того, как поток 1 прочитает из буфера.

Затем поток 1 получает элемент по индексу N +1, но пропустил нужный товар.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...