C ++ Lock свободная очередь производителя / потребителя - PullRequest
2 голосов
/ 27 октября 2011

Я искал пример кода для очереди без блокировки по адресу:

http://drdobbs.com/high-performance-computing/210604448?pgno=2

(Также ссылка на многие вопросы SO, такие как Есть ли в C ++ готовая к производству очередь без блокировки или реализация без блокировки * * * *

Похоже, это должно работать для одного производителя / потребителя, хотя в коде есть несколько опечаток. Я обновил код, чтобы прочитать, как показано ниже, но он падает на меня. У кого-нибудь есть предложения почему?

В частности, делитель и последний должны быть объявлены как что-то вроде:

atomic<Node *> divider, last;    // shared

У меня нет компилятора, поддерживающего C ++ 0x на этой машине, так что, возможно, это все, что мне нужно ...

// Implementation from http://drdobbs.com/high-performance-computing/210604448
// Note that the code in that article (10/26/11) is broken.
// The attempted fixed version is below.

template <typename T>
class LockFreeQueue {
private:
    struct Node {
        Node( T val ) : value(val), next(0) { }
        T value;
        Node* next;
    };
    Node *first,      // for producer only
    *divider, *last;    // shared
public:
    LockFreeQueue()
    {
        first = divider = last = new Node(T()); // add dummy separator
    }
    ~LockFreeQueue()
    {
        while( first != 0 )    // release the list
        {
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }
    void Produce( const T& t )
    {
        last->next = new Node(t);    // add the new item
        last = last->next;      // publish it

        while (first != divider) // trim unused nodes
        {
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }
    bool Consume( T& result )
    {
        if (divider != last)         // if queue is nonempty
        {
            result = divider->next->value; // C: copy it back
            divider = divider->next;      // D: publish that we took it
            return true;                  // and report success
        }
        return false;                   // else report empty
    }
};

Я написал следующий код, чтобы проверить это. Main (не показан) просто вызывает TestQ ().

#include "LockFreeQueue.h"

const int numThreads = 1;
std::vector<LockFreeQueue<int> > q(numThreads);

void *Solver(void *whichID)
{
    int id = (long)whichID;
    printf("Thread %d initialized\n", id);
    int result = 0;
    do {
        if (q[id].Consume(result))
        {
            int y = 0;
            for (int x = 0; x < result; x++)
            { y++; }
            y = 0;
        }
    } while (result != -1);
    return 0;
}


void TestQ()
{
    std::vector<pthread_t> threads;
    for (int x = 0; x < numThreads; x++)
    {
        pthread_t thread;
        pthread_create(&thread, NULL, Solver, (void *)x);
        threads.push_back(thread);
    }
    for (int y = 0; y < 1000000; y++)
    {
        for (unsigned int x = 0; x < threads.size(); x++)
        {
            q[x].Produce(y);
        }
    }
    for (unsigned int x = 0; x < threads.size(); x++)
    {
        q[x].Produce(-1);
    }
    for (unsigned int x = 0; x < threads.size(); x++)
        pthread_join(threads[x], 0);    
}

Обновление: в конечном итоге причиной сбоя является объявление очереди:

std::vector<LockFreeQueue<int> > q(numThreads);

Когда я изменяю это на простой массив, он работает нормально. (Я реализовал версию с блокировками, и она тоже зависала.) Я вижу, что деструктор вызывается сразу после конструктора, что приводит к освобождению памяти вдвое. Но кто-нибудь знает, ПОЧЕМУ деструктор будет вызван немедленно с помощью std :: vector?

Ответы [ 3 ]

1 голос
/ 27 октября 2011

Очень важно, чтобы эти записи (только один пример из вашего кода) происходили в следующем порядке:

last->next = new Node(t);    // add the new item
last = last->next;      // publish it

Это не гарантируется C ++ - оптимизатор может переставлять вещи так, как ему нравится, при условии, что текущий поток всегда действует так, как если бы программа работала точно так, как вы ее написали. И тогда кэш ЦП может прийти и изменить порядок вещей.

Вам нужны ограждения памяти. Заставить указатели использовать атомарный тип должен иметь такой эффект.

1 голос
/ 27 октября 2011

Вам нужно будет сделать несколько указателей std :: atomic, как вы заметили, и вам потребуется использовать compare_exchange_weak в цикле для их атомарного обновления.В противном случае несколько потребителей могут использовать один и тот же узел, а несколько производителей могут испортить список.

0 голосов
/ 27 октября 2011

Это может быть совершенно неправильно, но я не могу не задаться вопросом, есть ли у вас какие-то проблемы, связанные со статической инициализацией ... Для смеха попробуйте объявить q как указатель вектору очередей без блокировки и расположению его в куче в main().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...