Проблема с потокобезопасной очередью? - PullRequest
2 голосов
/ 12 февраля 2009

Я пытаюсь написать потокобезопасную очередь, используя pthreads в c ++. Моя программа работает в 93% случаев. Остальные 7% времени он выплевывает мусор, ИЛИ, кажется, засыпает. Мне интересно, есть ли какой-нибудь недостаток в моей очереди, когда переключение контекста сломало бы это?

// thread-safe queue
// inspired by http://msmvps.com/blogs/vandooren/archive/2007/01/05/creating-a-thread-safe-producer-consumer-queue-in-c-without-using-locks.aspx
// only works with one producer and one consumer
#include <pthread.h>
#include <exception>

template<class T>
class tsqueue
{
    private:
        volatile int m_ReadIndex, m_WriteIndex;
        volatile T *m_Data;
        volatile bool m_Done;
        const int m_Size;
        pthread_mutex_t m_ReadMutex, m_WriteMutex;
        pthread_cond_t m_ReadCond, m_WriteCond;
    public:
        tsqueue(const int &size);
        ~tsqueue();
        void push(const T &elem);
        T pop();
        void terminate();
        bool isDone() const;
};

template <class T>
tsqueue<T>::tsqueue(const int &size) : m_ReadIndex(0), m_WriteIndex(0), m_Size(size), m_Done(false) {
    m_Data = new T[size];
    pthread_mutex_init(&m_ReadMutex, NULL);
    pthread_mutex_init(&m_WriteMutex, NULL);
    pthread_cond_init(&m_WriteCond, NULL);
    pthread_cond_init(&m_WriteCond, NULL);
}

template <class T>
tsqueue<T>::~tsqueue() {
    delete[] m_Data;
    pthread_mutex_destroy(&m_ReadMutex);
    pthread_mutex_destroy(&m_WriteMutex);
    pthread_cond_destroy(&m_ReadCond);
    pthread_cond_destroy(&m_WriteCond);
}


template <class T>
void tsqueue<T>::push(const T &elem) {
    int next = (m_WriteIndex + 1) % m_Size;
    if(next == m_ReadIndex) {
        pthread_mutex_lock(&m_WriteMutex);
        pthread_cond_wait(&m_WriteCond, &m_WriteMutex);
        pthread_mutex_unlock(&m_WriteMutex);
    }
    m_Data[m_WriteIndex] = elem;
    m_WriteIndex = next;
    pthread_cond_signal(&m_ReadCond);
}

template <class T>
T tsqueue<T>::pop() {
    if(m_ReadIndex == m_WriteIndex) {
        pthread_mutex_lock(&m_ReadMutex);
        pthread_cond_wait(&m_ReadCond, &m_ReadMutex);
        pthread_mutex_unlock(&m_ReadMutex);
        if(m_Done && m_ReadIndex == m_WriteIndex) throw "queue empty and terminated";
    }
    int next = (m_ReadIndex +1) % m_Size;
    T elem = m_Data[m_ReadIndex];
    m_ReadIndex = next;
    pthread_cond_signal(&m_WriteCond);
    return elem;
}

template <class T>
void tsqueue<T>::terminate() {
    m_Done = true;
    pthread_cond_signal(&m_ReadCond);
}

template <class T>
bool tsqueue<T>::isDone() const {
    return (m_Done && m_ReadIndex == m_WriteIndex);
}

Это можно использовать так:

// thread 1
while(cin.get(c)) {
    queue1.push(c);
}
queue1.terminate();


// thread 2
while(!queue1.isDone()) {
    try{ c = queue1.pop(); }
    catch(char const* str){break;}
    cout.put(c);
}

Если кто-то видит проблему с этим, пожалуйста, скажите:)

Ответы [ 5 ]

8 голосов
/ 12 февраля 2009

Да, здесь определенно есть проблемы. Все ваши обращения к переменным-членам очереди происходят за пределами мьютексов. На самом деле, я не совсем уверен, что ваши мьютексы защищают, так как они просто ожидают условную переменную.

Кроме того, похоже, что ваш читатель и писатель всегда будут работать в режиме блокировки, никогда не позволяя очереди вырасти до размера, превышающего один элемент.

3 голосов
/ 12 февраля 2009

Если это ваш настоящий код, одна проблема сразу же заключается в том, что вы инициализируете m_WriteCond дважды, а не инициализируете m_ReadCond вообще.

2 голосов
/ 12 февраля 2009

Этот класс следует рассматривать как монитор . У вас должна быть «блокировка монитора» для каждой очереди (обычный мьютекс). Всякий раз, когда вы вводите метод, который читает или записывает какое-либо поле в очереди, вы должны заблокировать этот мьютекс, как только вы его введете. Это предотвращает взаимодействие нескольких потоков с очередью одновременно. Вы должны снять блокировку до того, как дождетесь выполнения условия и когда выйдете из метода, чтобы могли войти другие потоки. Не забудьте повторно получить блокировку, когда вы закончите, ожидая условия.

1 голос
/ 12 февраля 2009

Если вы хотите что-то с достойной производительностью, я настоятельно рекомендую снять блокировку R / W и просто использовать очень простую спин-блокировку. Или, если вы действительно думаете, что можете получить желаемую производительность с помощью R / W-блокировки, я бы выбрал вашу собственную, основываясь на этом дизайне (одиночное слово R / W Spinlock) от Джо Даффи.

0 голосов
/ 12 февраля 2009

Кажется, что проблема в том, что у вас есть условие состязания, которое поток 2 МОЖЕТ запустить до того, как поток 1 когда-либо сделает cin.get (c). Необходимо убедиться, что данные инициализированы, и когда вы получаете информацию, вы гарантируете, что делаете что-то, если данные не были введены.

Может быть, это я не вижу остальную часть кода, где это делается.

...