Очередь без блокировки с boost :: atomic - Я правильно делаю? - PullRequest
4 голосов
/ 26 февраля 2012

Короткая версия:

Я пытаюсь заменить std :: atomic из C ++ 11, используемый в реализации без блокировки, для одного производителя, для очереди с одним потребителем из здесь .Как заменить это на boost::atomic?

Длинная версия:

Я пытаюсь повысить производительность нашего приложенияс рабочими нитями.Каждый поток имеет свою очередь задач.Мы должны синхронизироваться с помощью блокировки перед удалением / постановкой в ​​очередь каждой задачи.

Тогда я нашел статью Херба Саттера о очереди без блокировки.Это похоже на идеальную замену.Но в коде используется std::atomic из C ++ 11, который я не мог представить в данный момент в проекте.

Более гуглирование привело к некоторым примерам, таким как этот для Linux (echelon's) и для Windows (TINESWARE's) .Оба используют специфические для платформы конструкции, такие как WinAPI InterlockedExchangePointer и GCC __sync_lock_test_and_set.

. Мне нужно только поддерживать Windows и Linux, так что, возможно, я смогу обойтись с некоторыми #ifdef s.Но я подумал, что было бы лучше использовать то, что обеспечивает boost::atomic.Boost Atomic еще не является частью официальной библиотеки Boost.Поэтому я скачал исходный код из http://www.chaoticmind.net/~hcb/projects/boost.atomic/ и использую включаемые файлы с моим проектом.

Это то, что я получаю до сих пор:

#pragma once

#include <boost/atomic.hpp>

template <typename T>
class LockFreeQueue
{
private:
    struct Node
    {
        Node(T val) : value(val), next(NULL) { }
        T value;
        Node* next;
    };
    Node* first; // for producer only
    boost::atomic<Node*> divider;  // shared
    boost::atomic<Node*> last; // shared

public:
    LockFreeQueue()
    {
        first = new Node(T());
        divider = first;
        last= first;
    }

    ~LockFreeQueue()
    {
        while(first != NULL) // release the list
        {
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }

    void Produce(const T& t)
    {
        last.load()->next = new Node(t); // add the new item
        last = last.load()->next;
        while(first != divider) // trim unused nodes
        {
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }

    bool Consume(T& result)
    {
        if(divider != last) // if queue is nonempty
        {
            result = divider.load()->next->value; // C: copy it back
            divider = divider.load()->next;
            return true;  // and report success
        }
        return false;  // else report empty
    }
};

Некоторые изменения, на которые следует обратить внимание:

boost::atomic<Node*> divider;  // shared
boost::atomic<Node*> last; // shared

и

    last.load()->next = new Node(t); // add the new item
    last = last.load()->next;

и

        result = divider.load()->next->value; // C: copy it back
        divider = divider.load()->next;

Применяю ли я load () (и неявное хранилище ()) от boost :: atomic правильноВот?Можем ли мы сказать, что это эквивалентно исходной безблокировочной очереди C ++ 11 Саттера?

PS.Я изучил многие потоки в SO, но ни один из них не дает пример для очереди boost :: atomic & free-free.

Ответы [ 2 ]

1 голос
/ 24 мая 2012

Вы пробовали Intel Thread Building Blocks atomic<T>?Кроссплатформенный и бесплатный.

Также ...

Один производитель / один потребитель значительно облегчает вашу проблему, поскольку ваша точка линеаризации может быть одним оператором.Еще проще, если вы готовы принять ограниченную очередь.

A ограниченная очередь предлагает преимущества для производительности кэша, поскольку вы можете зарезервировать блок памяти с выравниванием кэша, чтобы максимизировать свои попадания, например:

#include <vector>
#include "tbb/atomic.h"
#include "tbb/cache_aligned_allocator.h"    

template< typename T >
class SingleProdcuerSingleConsumerBoundedQueue { 
    typedef vector<T, cache_aligned_allocator<T> > queue_type;

public:
    BoundedQueue(int capacity):
        queue(queue_type()) {
        head = 0;
        tail = 0;
        queue.reserve(capacity);
    }

    size_t capacity() {
        return queue.capacity();
    }

    bool try_pop(T& result) {
        if(tail - head == 0)
            return false;
        else {
            result = queue[head % queue.capacity()];
            head.fetch_and_increment(); //linearization point
            return(true);
        }
    }

    bool try_push(const T& source) {
        if(tail - head == queue.capacity()) 
            return(false);
        else {
            queue[tail %  queue.capacity()] = source;
            tail.fetch_and_increment(); //linearization point
            return(true);
        }
    }

    ~BoundedQueue() {}

private:
    queue_type queue;
    atomic<int> head;
    atomic<int> tail;
};
0 голосов
/ 02 октября 2012

Проверьте этот пример импульсного кольцевого буфера boost.atomic из документации:

#include <boost/atomic.hpp>

template <typename T, size_t Size>
class ringbuffer
{
public:
    ringbuffer() : head_(0), tail_(0) {}

    bool push(const T & value)
    {
        size_t head = head_.load(boost::memory_order_relaxed);
        size_t next_head = next(head);
        if (next_head == tail_.load(boost::memory_order_acquire))
            return false;
        ring_[head] = value;
        head_.store(next_head, boost::memory_order_release);
        return true;
    }

    bool pop(T & value)
    {
        size_t tail = tail_.load(boost::memory_order_relaxed);
        if (tail == head_.load(boost::memory_order_acquire))
            return false;
        value = ring_[tail];
        tail_.store(next(tail), boost::memory_order_release);
        return true;
    }

private:
    size_t next(size_t current)
    {
        return (current + 1) % Size;
    }

    T ring_[Size];
    boost::atomic<size_t> head_, tail_;
};

// How to use    
int main()
{
    ringbuffer<int, 32> r;

    // try to insert an element
    if (r.push(42)) { /* succeeded */ }
    else { /* buffer full */ }

    // try to retrieve an element
    int value;
    if (r.pop(value)) { /* succeeded */ }
    else { /* buffer empty */ }
}

Единственным ограничением кода является то, что длина буфера должна быть известна во время компиляции (или во время построения, если вы замените массив на std::vector<T>). Насколько я понимаю, позволить буферу увеличиваться и уменьшаться не тривиально.

...