Решить: очередь без блокировки, несколько производителей, несколько потребителей - повреждение памяти - PullRequest
0 голосов
/ 15 декабря 2018

LockFreeQueueMPMC должен решить проблему MPMC без блокировок, но во время выполнения происходит повреждение памяти.LockFreeDispatchStackMPMC действительно решает проблему MPMC без блокировок и используется в качестве основы для распределителя LockFreeCacheMPMC.Обе эти реализации проходят стресс-тестирование.

LockFreeQueueMPMC Enqueue выполняет ту же операцию, что и Lock FreeDispatchStackMPMC Send.Это добавляет новый узел в список.Операция Dequeue является более сложной.Только один указатель может быть одновременно cmpexg, поэтому нет решения с указателем Tail.Чтобы удалить узел из списка, необходимо пройти по списку и удалить последний узел.Это изменяет время ожидания с O (1) на O (N), но без блокировки.

LockFreeDispatchStackMPMC - это нестандартное решение без блокировок MPMC.Сообщения, поступившие раньше, обрабатываются первыми.При этом вместо очереди используется стек, для некоторых проблем это неприемлемо, так как сообщения должны быть упорядочены.Это показывает повышение производительности на 40% по сравнению с очередью, если ваши сообщения могут быть неупорядоченными.

template<class T>
struct Node
{
    std::atomic<int> Next;
    T *Data;
};
template<class T>
class LockFreeDispatchStackMPMC
{
public:
    LockFreeDispatchStackMPMC()
    {
        Head = NULL;
    }
    ~LockFreeDispatchStackMPMC(){
    }
    void Send(T *item)
    {
        Node<T> * new_node = Cache.Malloc();
        new_node->Data=item;
        bool done = false;
        while(!done)
        {
            auto head = Head.load();
            new_node->Next.store( head);
            if( Head.compare_exchange_weak(head,new_node))
            {
                done = true;
            }
        }
    }

    T *Recieve()
    {
        T *returnValue = NULL;
        bool done = false;
        while(!done)
        {
            auto head = Head.load();
            if(head == NULL)
            {
                done=true;
            }
            else
            {
                Node<T> * curr = head;
                Node<T> *next = curr->Next.load();
                if(Head.compare_exchange_weak(head,next))
                {
                    done = true;
                    returnValue = curr->Data;
                    curr->Next =NULL;
                    Cache.Free(curr);
                }

            }
        }
        return returnValue;
    }
public:
    std::atomic<Node<T> *> Head;

private:
    LockFreeMemCache<Node<T> > Cache;
};

Это контейнер Cache of Objects, основанный на использовании двух списков, которые можно использовать как пул между потоками для хранения объектов.Это также позволяет читать с кладбища, поскольку объекты не были уничтожены, записи не должны быть разрешены.Это проблема в алгоритме очереди.Это также безопасно для MPMC, даже если каждый узел должен быть выделен по одному за раз.

#define GROW_BY_SIZE 4

template<class T>
class LockFreeCacheMPMC
{
public:
    LockFreeCacheMPMC()
    {
        Head=NULL;
        FreeStack=NULL;
        AddSomeCache();
    }
    ~LockFreeCacheMPMC()
    {
        Node<T> *node ,*prev;

        bool done = false;
        node = Head;
        prev = NULL;
        while(!done)
        {
            prev = node;
            if(node == NULL)
            {
                done = true;
            }
            else
            {
                node = node->Next.load();
                delete prev->Data;
                delete prev;
            }
        }
        done = false;
        node = FreeStack;
        prev = NULL;
        while(!done)
        {
            prev = node;
            if(node == NULL)
            {
                done = true;
            }
            else
            {
                node = node->Next.load();
                delete prev;
            }
        }
    }
    T *Malloc()
    {
        T *returnValue = NULL;
        returnValue=Pop();
        while(returnValue==NULL)
        {
            AddSomeCache();
            returnValue=Pop();
        }
        return returnValue;
    }
    void Free(T *ptr)
    {
        Push(ptr);
    }

private:
    void AddSomeCache()
    {

        for(int i=0; i < GROW_BY_SIZE; i++)
        {
            T *tmp = new T();
            Push(tmp);
        }
    }

private:
    void Push(T *item)
    {
        Node<T> * new_node = PopNode(true);
        new_node->Data=item;

        bool done = false;
        while(!done)
        {
            Node<T>* head = Head.load();
            new_node->Next.store(head);
            if(Head.compare_exchange_weak(head,new_node))
            {
                done = true;
            }
        }
    }
    T *Pop()
    {
        T *returnValue = NULL;
        bool done = false;
        while(!done)
        {
            Node<T> * curr= Head.load();
            if(curr == NULL)
            {
                done=true;
            }
            else
            {
                Node<T> *next = curr->Next.load();
                if(Head.compare_exchange_weak(curr,next))
                {
                    done = true;
                    returnValue = curr->Data;
                    PushNode(curr);
                }
            }
        }
        return returnValue;
    }

    void PushNode(Node<T> *item)
    {
        item->Next = NULL;
        item->Data = NULL;
        bool done = false;
        while(!done)
        {
            Node<T>* fs = FreeStack.load();
            item->Next.store(fs);
            if(FreeStack.compare_exchange_weak(fs,item))
            {
                done = true;
            }
        }
    }
    Node<T> *PopNode(bool Alloc)
    {
        Node<T> *returnValue = NULL;

        bool done = false;
        while(!done)
        {
            Node<T> *fs = FreeStack.load();
            if(fs == NULL)
            {
                done=true;
            }
            else
            {
                Node<T> *next = fs->Next.load();
                if(FreeStack.compare_exchange_weak(fs,next))
                {
                    done = true;
                    returnValue = fs;
                }

            }
        }
        if ((returnValue == NULL) &&Alloc )
        {
            returnValue =new Node<T>();
            returnValue->Data = NULL;
            returnValue->Next = NULL;
        }
        return returnValue;
    }
    std::atomic<Node<T> *> Head;
    std::atomic<Node<T> *>FreeStack;
};

Это класс проблемы.Это будет работать некоторое время, но коррупция случается.Проблема в методе Dequeue.Узлы удаляются из списка по одному.Это возможно для узла, чтобы быть урезанным из-под вас на каждом шагу.Это приводит к тому, что узлы были обрезаны и должны быть «удалены», но все еще есть активные потоки, читающие с узла.Алгоритм должен предотвращать любые записи в мертвый узел, так как атомарно следующий указатель указывает либо на узел, либо на ноль, но использование пула кэша для хранения узлов обеспечивает безопасное чтение с кладбища.

template<class T>
class LockFreeQueueMPMC
{
public:
    LockFreeQueueMPMC()
    {
        Head=NULL;
    }
    ~LockFreeQueueMPMC(){
    }
    void Enqueue(T *item)
    {
        Node<T> * new_node = Cache.Malloc();
        new_node->Data=item;

        bool done = false;
        while(!done)
        {
            auto head = Head.load();
            new_node->Next.store(head);
            if(Head.compare_exchange_weak(head,new_node))
            {
                done = true;
            }
        }
    }
    T *Dequeue()
    {
        T *returnValue=NULL;
        bool done = false;
        while(!done)
        {
            Node<T> *head = Head.load();
            if(head == NULL)
            {
                done = true;
            }
            else
            {
                Node<T> * prev, *curr;
                prev = NULL;
                curr = head;
                bool found = false;
                while(!found)
                {
                    if(curr == NULL)
                    {
                        break;
                    }
                    Node<T> * next = curr->Next.load();
                    if(next == NULL)
                    {
                        found=true;
                        break;
                    }
                    prev = curr;
                    curr = next;
                }
                if(found)
                {
                    if(prev == NULL)
                    {
                        if(Head.compare_exchange_weak(head,NULL))
                        {
                            done = true;
                        }
                    }
                    else
                    {
                        if(prev->Next.compare_exchange_weak(curr,NULL))
                        {
                            done = true;
                        }
                    }
                    if(done)
                    {
                        returnValue = curr->Data;
                        Cache.Free(curr);
                    }
                }
            }
        }
        return returnValue;
    }
private:
    std::atomic<Node<T> *> Head;
    LockFreeMemCache<Node<T> > Cache;
};

Проблема находится в методе Dequeue, есть шаг, который портит, но очень редко.

1 Ответ

0 голосов
/ 21 декабря 2018

Я бы посоветовал вам использовать инструменты, чтобы помочь вам, поскольку это не так просто понять код.Я не знаю, какой компилятор вы используете, так как я разработчик Linux, я могу предложить в качестве примера gcc - есть средство для очистки потоков, которое можно использовать.Это довольно быстро, и есть вероятность, что вы сможете воспроизвести и поймать состояние гонки:

Как использовать нит-дезинфицирующее средство gcc v4.8.1?

...