Я искал пример кода для очереди без блокировки по адресу:
http://drdobbs.com/high-performance-computing/210604448?pgno=2
(Также ссылка на многие вопросы SO, такие как Есть ли в C ++ готовая к производству очередь без блокировки или реализация без блокировки * * * *
Похоже, это должно работать для одного производителя / потребителя, хотя в коде есть несколько опечаток. Я обновил код, чтобы прочитать, как показано ниже, но он падает на меня. У кого-нибудь есть предложения почему?
В частности, делитель и последний должны быть объявлены как что-то вроде:
atomic<Node *> divider, last; // shared
У меня нет компилятора, поддерживающего C ++ 0x на этой машине, так что, возможно, это все, что мне нужно ...
// Implementation from http://drdobbs.com/high-performance-computing/210604448
// Note that the code in that article (10/26/11) is broken.
// The attempted fixed version is below.
template <typename T>
class LockFreeQueue {
private:
struct Node {
Node( T val ) : value(val), next(0) { }
T value;
Node* next;
};
Node *first, // for producer only
*divider, *last; // shared
public:
LockFreeQueue()
{
first = divider = last = new Node(T()); // add dummy separator
}
~LockFreeQueue()
{
while( first != 0 ) // release the list
{
Node* tmp = first;
first = tmp->next;
delete tmp;
}
}
void Produce( const T& t )
{
last->next = new Node(t); // add the new item
last = last->next; // publish it
while (first != divider) // trim unused nodes
{
Node* tmp = first;
first = first->next;
delete tmp;
}
}
bool Consume( T& result )
{
if (divider != last) // if queue is nonempty
{
result = divider->next->value; // C: copy it back
divider = divider->next; // D: publish that we took it
return true; // and report success
}
return false; // else report empty
}
};
Я написал следующий код, чтобы проверить это. Main (не показан) просто вызывает TestQ ().
#include "LockFreeQueue.h"
const int numThreads = 1;
std::vector<LockFreeQueue<int> > q(numThreads);
void *Solver(void *whichID)
{
int id = (long)whichID;
printf("Thread %d initialized\n", id);
int result = 0;
do {
if (q[id].Consume(result))
{
int y = 0;
for (int x = 0; x < result; x++)
{ y++; }
y = 0;
}
} while (result != -1);
return 0;
}
void TestQ()
{
std::vector<pthread_t> threads;
for (int x = 0; x < numThreads; x++)
{
pthread_t thread;
pthread_create(&thread, NULL, Solver, (void *)x);
threads.push_back(thread);
}
for (int y = 0; y < 1000000; y++)
{
for (unsigned int x = 0; x < threads.size(); x++)
{
q[x].Produce(y);
}
}
for (unsigned int x = 0; x < threads.size(); x++)
{
q[x].Produce(-1);
}
for (unsigned int x = 0; x < threads.size(); x++)
pthread_join(threads[x], 0);
}
Обновление: в конечном итоге причиной сбоя является объявление очереди:
std::vector<LockFreeQueue<int> > q(numThreads);
Когда я изменяю это на простой массив, он работает нормально. (Я реализовал версию с блокировками, и она тоже зависала.) Я вижу, что деструктор вызывается сразу после конструктора, что приводит к освобождению памяти вдвое. Но кто-нибудь знает, ПОЧЕМУ деструктор будет вызван немедленно с помощью std :: vector?