Реализация ManualResetEvent (из C #) на C ++: как избежать состояния гонки - PullRequest
0 голосов
/ 23 сентября 2018

Я хотел бы узнать больше о многопоточном программировании, и я подумал, что было бы неплохо попробовать реализовать некоторые примитивы синхронизации C # в C ++.
Я начал с ManualResetEvent , и это то, что у меня пока есть:

class manual_reset_event
{
public:
    void wait_one()
    {
        if (cv_flag_.load() == false)
        {
            thread_local std::mutex mutex;
            std::unique_lock<std::mutex> lock(mutex);
            cond_var_.wait(lock, [this]() { return cv_flag_.load(); });
        }
    }

    void set()
    {
        cv_flag_.store(true);
        cond_var_.notify_all();
    }

    void reset()
    {
        cv_flag_.store(false);
    }

private:
    std::condition_variable cond_var_;
    std::atomic<bool> cv_flag_;
};

Однако здесь есть условие состязания: вы можете вызвать wait_one () в одном потоке, пройти проверку if (cv_flag), а затемнабор вызовов из другого потока.Это заставит wait_one () ждать, хотя cv_flag_ теперь имеет значение true.
Я мог бы решить эту проблему, используя блокировку для wait_one, установить и сбросить.
Думаю, я мог бы также решить эту проблему, вызвав cond_var_.notify_all () сразу после cond_var_.wait () для wait_one (), но я не думаю, что это отличная идея (хотя, возможно, я ошибаюсь).
Мне было интересно, есть ли что-то еще (может быть, даже совсем другоеподход без использования условных переменных), что я могу сделать здесь, чтобы избежать этого условия гонки.

Ответы [ 2 ]

0 голосов
/ 23 сентября 2018

одна возможная реализация - удерживать список ожидающих потоков событий.для состояния защиты manual_reset_event можно использовать std::mutex.когда поток начинает ждать - он проверяет состояние события, и если он не сигнализируется - вставьте в список сам "блок ожидания".это делается внутри «критической секции», защищенной общим мьютексом объекта.затем, если нам нужно ждать события (когда и только когда мы вставляем себя в список ожидания) - начинайте ожидание в блоке ожидания.но очень важный выход из «критической секции» до этого и даже не временный захват его после окончания ожидания.с другой стороны, поток, который устанавливает событие - получить список ожидающих потоков, а затем уведомить все это (после выхода из критического раздела) или, возможно, уведомить только один, который начинает ожидание первым.таким образом, мы можем реализовать ручную логику событий повторного отправки (когда все ожидающие потоки активируются одновременно) или логику автоматического сброса событий - когда будет активирован только один поток, и событие будет просто сброшено снова (на самом деле вообще не установлено состояние сигнала).только когда больше нет ожидающих потоков - событие сигнализирует.

class manual_reset_event : std::mutex
{
    struct WaitBlock : public std::condition_variable, std::mutex  
    {
        WaitBlock(WaitBlock* next) : next(next), signaled(false) {}

        WaitBlock* next;
        volatile bool signaled;

        void Wait()
        {
            // synchronization point with Wake()
            std::unique_lock<std::mutex> lock(*this);

            while (!signaled)
            {
                // notify_one() yet not called
                wait(lock);
            }
        }

        void Wake()
        {
            {
                // synchronization point with Wait()
                std::lock_guard<std::mutex> lock(*this);
                signaled = true;
            }
            notify_one();
        }
    };

    WaitBlock* _head;
    volatile bool _signaled;

public:

    manual_reset_event(bool signaled = false) : _signaled(signaled), _head(0) { }

    void wait()
    {
        lock();//++ protect object state

        WaitBlock wb(_head);

        bool inserted = false;

        if (!_signaled)
        {
            _head = &wb;
            inserted = true;
        }

        unlock();//-- protect object state

        if (inserted)
        {
            wb.Wait();
        }
    }

    // manual reset logic
    void set_all() 
    {
        WaitBlock* last, *head = 0;

        lock();//++ protect object state
        head = _head, _signaled = true;
        unlock();//-- protect object state

        while (last = head)
        {
            head = head->next;
            last->Wake();
        }
    }

    // auto reset logic - only one thread will be signaled, event auto reset
    void set_single()  
    {
        WaitBlock* last = 0;

        lock();//++ protect object state

        if (!_signaled)
        {
            if (last = _head)
            {
                // wake first waiting thread

                WaitBlock* prev = 0, *pwb;

                while (pwb = last->next)
                {
                    prev = last, last = pwb;
                }

                (prev ? prev->next : _head) = 0;

            }
            else
            {
                // nobody wait
                _signaled = true;
            }
        }
        unlock();//-- protect object state

        if (last)
        {
            last->Wake();
        }
    }

    void reset()
    {
        _signaled = false;
    }
};
0 голосов
/ 23 сентября 2018

В большинстве случаев проще всего использовать порядок, чтобы внутри вашего объекта были мьютексы и игнорировать атомистику.Просто убедитесь, что все обращения к вашим данным защищены блокировкой.

Если вы сохраняете только один бит, возможно, что быстрое выполнение set и reset приведет к потере пробужденийожидание потоков будет запланировано только после завершения reset.Для решения проблемы я буду использовать счетчик.Самый низкий бит счетчика - это его «открытое» состояние.Каждое изменение этого состояния осуществляется с приращением.Я использую 64-битный счетчик на всякий случай.Крайне маловероятно, что 32-разрядных битов будет недостаточно, даже если это может привести к циклическому изменению во время длительной работы программы.

class manual_reset_event
{
public:
    void wait_one()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        uint64_t initial_value = value_;
        if(initial_value & 1)
        {
            return;
        }
        while (value_ == initial_value)
        {
            signalled_.wait(lock);
        }
    }

    void set()
    {
        std::unique_lock<std::mutex> lock(mutex);
        if((value_ & 1) == 0)
        {
            value_++;
            lock.release(); // optimization
            signalled_.notify_all();
        }
    }

    void reset()
    {
        std::unique_lock<std::mutex> lock(mutex);
        if(value_ & 1)
        {
            value_++;
        }
    }

private:
    std::mutex mutex_;
    std::condition_variable signalled_;
    uint64_t value_;
};

Если вы настаиваете на том, чтобы избежать ненужного использования блокировки, вы можете использовать атомарность, норешение несколько сложнее, так как есть еще много вариантов для рассмотрения.

class manual_reset_event
{
public:
    void wait_one()
    {
        uint64_t initial_value = value_;
        if(initial_value & 1)
        {
            return;
        }
        std::unique_lock<std::mutex> lock(mutex_);
        while (value_ == initial_value)
        { // !
            signalled_.wait(lock);
        }
    }

    void set()
    {
        uint64_t initial_value = value_;
        if(initial_value & 1)
        {
            return;
        }
        std::unique_lock<std::mutex> lock(mutex_);
        // Still need lock to prevent lost wakeup if atomic change happens when
        // other thread is on "// !" line.
        if(value.compare_exchange_strong(initial_value, initial_value + 1)) {
        // One strong attempt is enough. If it fails than someone else must have
        // succeeded. It's as if these two set() operations happened at the same time.
            lock.release();
            signalled_.notify_all();
        }
    }

    void reset()
    {
        uint64_t initial_value = value_;
        if((initial_value & 1) == 0)
        {
            return;
        }
        std::unique_lock<std::mutex> lock(mutex_);
        value.compare_exchange_strong(initial_value, initial_value + 1);
    }

private:
    std::mutex mutex_;
    std::condition_variable signalled_;
    std::atomic<uint64_t> value_;
};
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...