Написание (вращающегося) барьера для потоков с использованием атомарности c ++ 11 - PullRequest
8 голосов
/ 14 ноября 2011

Я пытаюсь ознакомиться с атомарностью c ++ 11, поэтому я попытался написать барьерный класс для потоков (до того, как кто-то пожалуется на то, что он не использует существующие классы: это больше для обучения / самосовершенствования, чем из-за какой-либо реальной необходимости) ). мой класс выглядит в основном следующим образом:

class barrier
{
private:
    std::atomic<int> counter[2];
    std::atomic<int> lock[2];
    std::atomic<int> cur_idx;
    int thread_count;
public:
    //constructors...
    bool wait();
};

Все члены инициализируются в ноль, кроме thread_count, который содержит соответствующий счетчик. Я реализовал функцию ожидания как

int idx  = cur_idx.load();
if(lock[idx].load() == 0)
{
    lock[idx].store(1);
}
int val = counter[idx].fetch_add(1);
if(val >= thread_count - 1)
{
    counter[idx].store(0);
    cur_idx.fetch_xor(1);
    lock[idx].store(0);
    return true;
}
while(lock[idx].load() == 1);
return false;

Однако при попытке использовать его с двумя потоками (thread_count равно 2) первый поток попадает в цикл ожидания просто отлично, но второй поток не снимает барьер (кажется, он даже не доходит до int val = counter[idx].fetch_add(1);, но я не слишком уверен в этом. Однако, когда я использую gcc atomic-intrinsics, используя volatile int вместо std::atomic<int> и пишу wait следующим образом:

int idx = cur_idx;
if(lock[idx] == 0)
{
    __sync_val_compare_and_swap(&lock[idx], 0, 1);
}
int val = __sync_fetch_and_add(&counter[idx], 1);
if(val >= thread_count - 1)
{
    __sync_synchronize();
    counter[idx] = 0;
    cur_idx ^= 1;
    __sync_synchronize();
    lock[idx] = 0;
    __sync_synchronize();
    return true;
}
while(lock[idx] == 1);
return false;

работает просто отлично. Насколько я понимаю, между этими двумя версиями не должно быть принципиальных различий (более конкретно, если что-нибудь будет работать с меньшей вероятностью). Итак, какой из следующих сценариев применим?

  1. Мне повезло со второй реализацией, и мой алгоритм - дерьмо
  2. Я не до конца понял std::atomic и есть проблема с первым вариантом (но не со вторым)
  3. Это должно работать, но экспериментальная реализация для библиотек c ++ 11 не настолько развита, как я надеялся

Для записи я использую 32bit MINGW с GCC 4.6.1

Код вызова выглядит следующим образом:

spin_barrier b(2);
std::thread t([&b]()->void
{
    std::this_thread::sleep_for(std::chrono::duration<double>(0.1));
    b.wait();
});
b.wait();
t.join();

Поскольку mingw не отбивает <thread> jet заголовков, я использую самостоятельно написанную версию для того, что в основном оборачивает соответствующие функции pthread (прежде чем кто-то спросит: да, это работает без барьера, так что это не должно быть проблемой с упаковка) Любые идеи будут оценены.

edit: Объяснение алгоритма, чтобы сделать его более понятным:

  • thread_count - количество нитей, которые должны ожидать барьер (поэтому, если thread_count нитей находятся в барьере, все могут покинуть барьер).
  • lock устанавливается в единицу, когда первая (или любая) нить входит в барьер.
  • counter подсчитывает, сколько нитей находится внутри барьера и атомно увеличивается на единицу для каждой нити
  • if counter>=thread_count все нити находятся внутри барьера, поэтому счетчик и блокировка обнуляются
  • в противном случае поток ожидает, пока lock станет нулевым
  • при следующем использовании барьера используются различные переменные (counter, lock), чтобы гарантировать отсутствие проблем, если потоки все еще ожидают первого использования барьера (например, они были прерваны, когда барьер снят)

edit2: Сейчас я проверил его, используя gcc 4.5.1 под linux, где обе версии работают нормально, что, похоже, указывает на проблему с std::atomic в mingw, но я до сих пор не совсем убежден, так как изучаю <atomic> заголовок гласил, что большинство функций просто вызывают соответствующий gcc-атомарный смысл, между двумя версиями действительно не должно быть различий

Ответы [ 7 ]

23 голосов
/ 14 ноября 2011

Понятия не имею, поможет ли это, но в следующем фрагменте из реализации параллельной очереди Херба Саттера используется спин-блокировка, основанная на атомарности:

std::atomic<bool> consumerLock;

{   // the critical section
    while (consumerLock.exchange(true)) { }  // this is the spinlock

    // do something useful

    consumerLock = false;  // unlock
}

Фактически, Стандарт предоставляетспециально созданный тип для этой конструкции, который должен иметь операции без блокировки, std::atomic_flag.При этом критическая секция будет выглядеть следующим образом:

std::atomic_flag consumerLock;

{
    // critical section

    while (consumerLock.test_and_set()) { /* spin */ }

    // do stuff

    consumerLock.clear();
}

(Вы можете использовать там порядок и порядок выделения памяти, если хотите.)

5 голосов
/ 16 июля 2014

Вот элегантное решение из книги Параллельность C ++ в действии: практическая многопоточность .

struct bar_t {
    unsigned const count;
    std::atomic<unsigned> spaces;
    std::atomic<unsigned> generation;
    bar_t(unsigned count_) :
        count(count_), spaces(count_), generation(0)
    {}
    void wait() {
        unsigned const my_generation = generation;
        if (!--spaces) {
            spaces = count;
            ++generation;
        } else {
            while(generation == my_generation);
        }
    }
};
5 голосов
/ 14 ноября 2011

Это выглядит слишком сложно.Попробуйте эту более простую версию (ну, я не проверял ее, я просто медитировал на нее :))):

#include <atomic>

class spinning_barrier
{
public:
    spinning_barrier (unsigned int n) : n_ (n), nwait_ (0), step_(0) {}

    bool wait ()
    {
        unsigned int step = step_.load ();

        if (nwait_.fetch_add (1) == n_ - 1)
        {
            /* OK, last thread to come.  */
            nwait_.store (0); // XXX: maybe can use relaxed ordering here ??
            step_.fetch_add (1);
            return true;
        }
        else
        {
            /* Run in circles and scream like a little girl.  */
            while (step_.load () == step)
                ;
            return false;
        }
    }

protected:
    /* Number of synchronized threads. */
    const unsigned int n_;

    /* Number of threads currently spinning.  */
    std::atomic<unsigned int> nwait_;

    /* Number of barrier syncronizations completed so far, 
     * it's OK to wrap.  */
    std::atomic<unsigned int> step_;
};

РЕДАКТИРОВАТЬ: @ Гриззи, я не могу найти ни однойошибки в вашей первой (C ++ 11) версии, и я также запустил примерно сто миллионов синхронизаций с двумя потоками, и она завершается.Я запустил его на машине с двумя сокетами / четырехъядерным процессором GNU / Linux, поэтому я скорее склонен подозревать ваш вариант 3. - библиотека (или, скорее, ее порт для win32) недостаточно зрелая.

4 голосов
/ 16 ноября 2011

Вот моя простая версия:

// spinning_mutex.hpp
#include <atomic>


class spinning_mutex
{
private:
    std::atomic<bool> lockVal;
public:
    spinning_mutex() : lockVal(false) { };

    void lock()
    {
        while(lockVal.exchange(true) );
    } 

    void unlock()
    {
        lockVal.store(false);
    }

    bool is_locked()
    {
        return lockVal.load();
    }
};

Использование: (из std :: lock_guard пример)

#include <thread>
#include <mutex>
#include "spinning_mutex.hpp"

int g_i = 0;
spinning_mutex g_i_mutex;  // protects g_i

void safe_increment()
{
    std::lock_guard<spinning_mutex> lock(g_i_mutex);
    ++g_i;

    // g_i_mutex is automatically released when lock
    // goes out of scope
}

int main()
{
    std::thread t1(safe_increment);
    std::thread t2(safe_increment);

    t1.join();
    t2.join();
}
2 голосов
/ 13 августа 2014

Почему бы не использовать std :: atomic_flag (из C ++ 11)?

http://en.cppreference.com/w/cpp/atomic/atomic_flag

std :: atomic_flag - это атомный логический тип. В отличие от всех специализаций std :: atomic, он гарантированно не блокируется.

Вот как бы я написал свой класс барьера с вращающейся нитью:

#ifndef SPINLOCK_H
#define SPINLOCK_H

#include <atomic>
#include <thread>

class SpinLock
{
public:

    inline SpinLock() :
        m_lock(ATOMIC_FLAG_INIT)
    {
    }

    inline SpinLock(const SpinLock &) :
        m_lock(ATOMIC_FLAG_INIT)
    {
    }

    inline SpinLock &operator=(const SpinLock &)
    {
        return *this;
    }

    inline void lock()
    {
        while (true)
        {
            for (int32_t i = 0; i < 10000; ++i)
            {
                if (!m_lock.test_and_set(std::memory_order_acquire))
                {
                    return;
                }
            }

            std::this_thread::yield();  // A great idea that you don't see in many spinlock examples
        }
    }

    inline bool try_lock()
    {
        return !m_lock.test_and_set(std::memory_order_acquire);
    }

    inline void unlock()
    {
        m_lock.clear(std::memory_order_release);
    }

private:

    std::atomic_flag m_lock;
};

#endif
2 голосов
/ 16 января 2013

Я знаю, что поток немного староват, но так как он все еще является первым результатом Google при поиске барьера потока с использованием только c ++ 11, я хочу представить решение, которое избавляет от занятого ожидания, используя std::condition_variable. В основном это решение chill, но вместо цикла while используются std::conditional_variable.wait() и std::conditional_variable.notify_all(). В моих тестах это работает нормально.

#include <atomic>
#include <condition_variable>
#include <mutex>


class SpinningBarrier
{
    public:
        SpinningBarrier (unsigned int threadCount) :
            threadCnt(threadCount),
            step(0),
            waitCnt(0)
        {}

        bool wait()
        {
            if(waitCnt.fetch_add(1) >= threadCnt - 1)
            {
                std::lock_guard<std::mutex> lock(mutex);
                step += 1;

                condVar.notify_all();
                waitCnt.store(0);
                return true;
            }
            else
            {
                std::unique_lock<std::mutex> lock(mutex);
                unsigned char s = step;

                condVar.wait(lock, [&]{return step == s;});
                return false;
            }
        }
    private:
        const unsigned int threadCnt;
        unsigned char step;

        std::atomic<unsigned int> waitCnt;
        std::condition_variable condVar;
        std::mutex mutex;
};
1 голос
/ 10 июля 2015

Украдено прямо из документов

spinlock.h

#include <atomic>

using namespace std;

/* Fast userspace spinlock */
class spinlock {
public:
    spinlock(std::atomic_flag& flag) : flag(flag) {
        while (flag.test_and_set(std::memory_order_acquire)) ;
    };
    ~spinlock() {
        flag.clear(std::memory_order_release);
    };
private:
    std::atomic_flag& flag; 
};

creation.cpp

#include "spinlock.h"

atomic_flag kartuliga = ATOMIC_FLAG_INIT;

void mutually_exclusive_function()
{
    spinlock lock(kartuliga);
    /* your shared-resource-using code here */
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...