Не требующий копирования потокобезопасный кольцевой буфер для больших массивов - PullRequest
0 голосов
/ 06 сентября 2018

Для обработки сигналов на больших массивах (10 ^ 7 элементов) я использую разные потоки, связанные с кольцевыми буферами. К сожалению, слишком много времени просто необходимо для копирования данных в буфер и из него. Текущая реализация основана на boost::lockfree::spsc_queue.

Итак, я ищу решение для поменять местами владение векторов между потоками и буфером, используя unique_ptr для векторов (см. Прилагаемый чертеж: замена указателя между потоками и очередь ).

Перемещение умных указателей не соответствует моим потребностям, потому что поэтому мне нужно постоянно выделять память во время выполнения для новых векторных элементов. Эти издержки больше, чем копирование данных.

Мне не хватает недостатка в этом дизайне?

Существуют ли реализации поточно-ориентированного или даже без блокировки кольцевого буфера, разрешающие операции подкачки для push и pop?

Редактировать: Я изменил буфер блокировки кольца для замены unique_ptr. Повышение производительности огромно. Хотя это не похоже на элегантное решение. Любые рекомендации?

// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/circular_buffer.cpp

#include <memory>
#include <mutex>

template <typename T, int SIZE>
class RingbufferPointer {
typedef std::unique_ptr<T> TPointer;
public:
    explicit RingbufferPointer() {
        // create objects
        for (int i=0; i<SIZE; i++) {
            buf_[i] = std::make_unique<T>();
        }
    }

    bool push(TPointer &item) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (full())
            return false;

        std::swap(buf_[head_], item);

        if (full_)
            tail_ = (tail_ + 1) % max_size_;

        head_ = (head_ + 1) % max_size_;
        full_ = head_ == tail_;

        return true;
    }

    bool pop(TPointer &item) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (empty())
            return false;

        std::swap(buf_[tail_], item);

        full_ = false;
        tail_ = (tail_ + 1) % max_size_;

        return true;
    }

    void reset() {
        std::lock_guard<std::mutex> lock(mutex_);
        head_ = tail_;
        full_ = false;
    }

    bool empty() const {
        return (!full_ && (head_ == tail_));
    }

    bool full() const {
        return full_;
    }

    int capacity() const {
        return max_size_;
    }

    int size() const {
        int size = max_size_;

        if(!full_) {
            if(head_ >= tail_)
                size = head_ - tail_;
            else
                size = max_size_ + head_ - tail_;
        }

        return size;
    }

private:
    TPointer buf_[SIZE];

    std::mutex mutex_;
    int head_ = 0;
    int tail_ = 0;
    const int max_size_ = SIZE;
    bool full_ = 0;
};

Ответы [ 4 ]

0 голосов
/ 07 сентября 2018

если я правильно понимаю вашу задачу - вам нужно 2 контейнера:

  • Потокобезопасный и свободный от блокировок пул для свободных элементов - для не выделяемых / освободи его каждый раз. Нажатие и выталкивание без ожидания.
  • Потокобезопасная и без блокировок одиночная запись / одиночное считывание FIFO очередь, Нажатие и выталкивание без ожидания.

с этим вы можете сделать следующее:

  • в начале вы выделяете N элементов и помещаете его в пул.
  • Производитель освобождает элемент из пула (вместо этого выделяет память)
  • подготовить данные позиции
  • нажмите на FIFO очередь
  • если в пуле нет свободных предметов - сигнал ожидания от потребителя

  • Потребительский всплывающий элемент из FIFO очередь
  • данные элемента процесса
  • вернуть элемент обратно в пул (вместо этого освободить память)
  • если очередь пуста - ждать сигнала от производителя

FIFO очередь может быть реализована следующим образом:

class CyclicBufer
{
    struct alignas(8) Position 
    {
        ULONG _begin, _data_size;
    };

    std::atomic<Position> _pos;
    void** _items;

    ULONG _buf_size;

public:

    // Requires: only one thread is allowed to push data to the CyclicBufer
    bool push(void* item, bool* bWasEmpty = 0);

    // Requires: only one thread is allowed to pop data to the CyclicBufer
    bool pop(void** pitem, bool* bNotEmpty = 0);

    ~CyclicBufer()
    {
        if (_items)
        {
            delete [] _items;
        }
    }

    CyclicBufer() : _items(0), _buf_size(0)
    {
        _pos._My_val._begin = 0, _pos._My_val._data_size = 0;
    }

    bool create(ULONG buf_size)
    {
        if (_items = new(std::nothrow) void*[buf_size])
        {
            _buf_size = buf_size;
            return true;
        }

        return false;
    }

    bool is_empty()
    {
        Position current_pos = _pos.load(std::memory_order_relaxed);

        return !current_pos._data_size;
    }
};

bool CyclicBufer::push(void* item, bool* bWasEmpty /*= 0*/)
{
    Position current_pos = _pos.load(std::memory_order_relaxed);

    if (current_pos._data_size >= _buf_size) return false;

    // (_pos._begin + _pos._data_size) % _buf_size never changed in pop
    _items[(current_pos._begin + current_pos._data_size) % _buf_size] = item;

    for (;;)
    {
        Position new_pos = {
            current_pos._begin, current_pos._data_size + 1
        };

        if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_release))
        {
            if (bWasEmpty) *bWasEmpty = current_pos._data_size == 0;
            return true;
        }
    }
}

bool CyclicBufer::pop(void** pitem, bool* bNotEmpty /*= 0*/)
{
    Position current_pos = _pos.load(std::memory_order_acquire);

    if (!current_pos._data_size) return false;

    // current_pos._begin never changed in push
    void* item = _items[current_pos._begin];

    for (;;)
    {
        Position new_pos = {
            (current_pos._begin + 1) % _buf_size, current_pos._data_size - 1
        };

        if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_relaxed))
        {
            if (bNotEmpty) *bNotEmpty = new_pos._data_size != 0;
            *pitem = item;
            return true;
        }
    }
}

для потокового пула и пула без блокировки реализация на окнах может использоваться InterlockedPushEntrySList и InterlockedPopEntrySList, но, конечно, возможно реализовать это API и вы:

struct list_entry {
    list_entry *Next;
};

#if defined(_M_X64) || defined(_M_ARM64)
#define MACHINE_64
#endif

struct alignas(sizeof(PVOID)*2) list_head 
{  
    union {
        struct {
            INT_PTR DepthAndSequence;
            union {
                list_entry* NextEntry;
                INT_PTR iNextEntry;
            };
        };
        __int64 value; // for 32-bit only
    };

    void init()
    {
        iNextEntry = 0, DepthAndSequence = 0;
    }

    bool push(list_entry* entry)
    {
        list_head current = { { DepthAndSequence, NextEntry } }, new_head;

        for (;;)
        {
            entry->Next = current.NextEntry;
            new_head.NextEntry = entry;
            new_head.DepthAndSequence = current.DepthAndSequence + 0x10001;

#ifdef MACHINE_64
            if (_INTRIN_RELEASE(_InterlockedCompareExchange128)(
                &DepthAndSequence, 
                new_head.iNextEntry, new_head.DepthAndSequence, 
                &current.DepthAndSequence))
            {
                // return is list was empty before push
                return !current.NextEntry;
            }
#else
            new_head.value = _INTRIN_RELEASE(_InterlockedCompareExchange64)(
                &value, new_head.value, current.value);

            if (new_head.value == current.value)
            {
                // return is list was empty before push
                return !current.NextEntry;
            }

            current.value = new_head.value;
#endif
        }
    }

    list_entry* pop()
    {
        list_head current = { { DepthAndSequence, NextEntry } }, new_head;

        for (;;)
        {
            list_entry* entry = current.NextEntry;

            if (!entry)
            {
                return 0;
            }

            // entry must be valid memory
            new_head.NextEntry = entry->Next;
            new_head.DepthAndSequence = current.DepthAndSequence - 1;

#ifdef MACHINE_64
            if (_INTRIN_ACQUIRE(_InterlockedCompareExchange128)(&DepthAndSequence, 
                new_head.iNextEntry, new_head.DepthAndSequence, 
                &current.DepthAndSequence))
            {
                return entry;
            }
#else
            new_head.value = _INTRIN_ACQUIRE(_InterlockedCompareExchange64)(
                &value, new_head.value, current.value);

            if (new_head.value == current.value)
            {
                return entry;
            }

            current.value = new_head.value;
#endif
        }
    }
};

#pragma warning(disable : 4324)

template <class _Ty>
class FreeItems : list_head
{
    void* _items;

    union Chunk {
        list_entry entry;
        char buf[sizeof(_Ty)];
    };

public:

    ~FreeItems()
    {
        if (_items)
        {
            delete [] _items;
        }
    }

    FreeItems() : _items(0)
    {
        init();
    }

    bool create(ULONG count)
    {
        if (Chunk* items = new(std::nothrow) Chunk[count])
        {
            _items = items;

            union {
                list_entry* entry;
                Chunk* item;
            };

            item = items;

            do 
            {
                list_head::push(entry);

            } while (item++, --count);

            return true;
        }

        return false;
    }

    _Ty* pop()
    {
        return (_Ty*)list_head::pop();
    }

    bool push(_Ty* item)
    {
        return list_head::push((list_entry*)item);
    }
};

с этими 2 контейнерами демонстрационный / тестовый код может выглядеть (код для окон, но главное - как мы используем пул и очередь )

struct BigData 
{
    ULONG _id;
};

struct CPData : CyclicBufer, FreeItems<BigData>
{
    HANDLE _hDataEvent, _hFreeEvent, _hConsumerStop, _hProducerStop;

    ULONG _waitReadId, _writeId, _setFreeCount, _setDataCount;

    std::_Atomic_integral_t _dwRefCount;
    bool _bStop;

    static ULONG WINAPI sProducer(void* This)
    {
        reinterpret_cast<CPData*>(This)->Producer();
        reinterpret_cast<CPData*>(This)->Release();
        return __LINE__;
    }

    void Producer()
    {
        HANDLE Handles[] = { _hProducerStop, _hFreeEvent  };

        for (;;)
        {
            BigData* item;

            while (!_bStop && (item = FreeItems::pop()))
            {
                // init data item
                item->_id = _writeId++;

                bool bWasEmpty;

                if (!CyclicBufer::push(item, &bWasEmpty)) __debugbreak();

                if (bWasEmpty)
                {
                    _setDataCount++;
                    SetEvent(_hDataEvent);
                }
            }

            switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
            {
            case WAIT_OBJECT_0:
                SetEvent(_hConsumerStop);
                return;
            case WAIT_OBJECT_0 + 1:
                break;
            default:
                __debugbreak();
            }
        }
    }

    static ULONG WINAPI sConsumer(void* This)
    {
        reinterpret_cast<CPData*>(This)->Consumer();
        reinterpret_cast<CPData*>(This)->Release();
        return __LINE__;
    }

    void Consumer()
    {
        HANDLE Handles[] = { _hDataEvent, _hConsumerStop };

        for (;;)
        {
            switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
            {
            case WAIT_OBJECT_0:
                break;
            case WAIT_OBJECT_0 + 1:
                return;
            default:
                __debugbreak();
            }

            bool bNotEmpty;

            do 
            {
                BigData* item;

                if (!CyclicBufer::pop((void**)&item, &bNotEmpty)) __debugbreak();

                // check FIFO order
                if (item->_id != _waitReadId) __debugbreak();

                _waitReadId++;

                // process item

                // free item to the pool
                if (FreeItems::push(item))
                {
                    // stack was empty
                    _setFreeCount++;
                    SetEvent(_hFreeEvent);
                }

            } while (bNotEmpty);
        }
    }

    ~CPData()
    {
        if (_hConsumerStop) CloseHandle(_hConsumerStop);
        if (_hProducerStop) CloseHandle(_hProducerStop);
        if (_hFreeEvent) CloseHandle(_hFreeEvent);
        if (_hDataEvent) CloseHandle(_hDataEvent);

        if (_waitReadId != _writeId || !CyclicBufer::is_empty()) __debugbreak();

        DbgPrint("%s(%u %u %u)\n", __FUNCTION__, _writeId, _setFreeCount, _setDataCount);
    }

public:

    CPData()
    {
        _hFreeEvent = 0, _hDataEvent = 0, _hProducerStop = 0, _hConsumerStop = 0;
        _waitReadId = 0, _writeId = 0, _dwRefCount = 1;
        _setFreeCount = 0, _setDataCount = 0, _bStop = false;
    }

    void AddRef()
    {
        _MT_INCR(_dwRefCount);
    }

    void Release()
    {
        if (!_MT_DECR(_dwRefCount))
        {
            delete this;
        }
    }

    ULONG Create(ULONG n)
    {
        if (!CyclicBufer::create(n) || !FreeItems::create(n))
        {
            return ERROR_NO_SYSTEM_RESOURCES;
        }

        return (_hDataEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
            (_hFreeEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
            (_hProducerStop = CreateEvent(0, TRUE, FALSE, 0)) &&
            (_hConsumerStop = CreateEvent(0, TRUE, FALSE, 0)) ? 0 : GetLastError();
    }

    ULONG StartThread(bool bConsumer)
    {
        AddRef();

        if (HANDLE hThread = CreateThread(0, 0, bConsumer ? sConsumer : sProducer, this, 0, 0))
        {
            CloseHandle(hThread);
            return 0;
        }

        Release();

        return GetLastError();
    }

    ULONG Stop()
    {
        ULONG err = SetEvent(_hProducerStop) ? 0 : GetLastError();
        _bStop = true;
        return err;
    }
};

void BufTest()
{
    if (CPData* p = new CPData)
    {
        if (!p->Create(16))
        {
            if (!p->StartThread(false))
            {
                p->StartThread(true);
            }

            MessageBoxW(0, 0, L"Wait Stop", MB_ICONINFORMATION);
            p->Stop();
        }
        p->Release();
    }
    MessageBoxW(0,0,0,1);
}
0 голосов
/ 06 сентября 2018

Одна техника, которую я использовал, это ...

void next_step(std::vector<std::string> &a)
{
    std::vector<std::string> v;
    v.swap(a);
    // process vector ...
}

Нет обмена или копирования отдельных элементов. Быстро и эффективно.

Mike

0 голосов
/ 06 сентября 2018

Хотя boost::lockfree::spsc_queue не хватает поддержки перемещения, вы все равно можете это сделать.

Пример перемещения векторов в очередь и из нее:

struct Element {
    std::vector<int> data_;

    Element(std::vector<int>& data)
        : data_(std::move(data))
    {}

    Element(Element const&) = delete;
    Element operator=(Element const&) = delete;

    operator std::vector<int>&&() {
        return std::move(data_);
    }
};

int main() {
    boost::lockfree::spsc_queue<Element, boost::lockfree::capacity<2>> q;

    std::vector<int> a(1);
    assert(!a.empty());
    q.push(&a, &a + 1); // Move the vector into the queue.
    assert(a.empty());

    std::vector<int> b = q.front(); // Move the vector from queue.
    assert(!b.empty());
    q.pop();
}
0 голосов
/ 06 сентября 2018

Перемещение умных указателей не соответствует моим потребностям, потому что поэтому мне нужно постоянно выделять память во время выполнения для новых векторных элементов.

Не обязательно верно, если вы предварительно выделите достаточно памяти и реализуете собственное управление памятью в виде простого сегрегированного хранилища , a.k.a пул .

Если вы сделаете это, ничто не помешает вам поменяться местами, и вы сможете сохранить свою существующую архитектуру, используя любой кольцевой буфер , который поддерживает обмен элементов и сохраняет ту же безопасность потоков, что и раньше. , Вы можете выбрать опцию просто , используя boost::pool вместо реализации своей собственной.

...