Семафор C ++ (semi * lockfree *), где его взять? - PullRequest
0 голосов
/ 08 ноября 2018

edit: это не дубликат любого вопроса, который позволяет блокировку мьютекса в post (). Пожалуйста, прочитайте внимательно, мне нужен пост без блокировки ()! Не отмечайте этот дубликат, если у вас нет реального ответа.

Семафор (как в linux) - это полезный строительный блок, которого нет ни в стандарте c ++, ни в boost (в настоящее время). Я в основном говорю о семафорах между потоками одного процесса поверх упреждающего планировщика.

Я особенно заинтересован в том, чтобы они были неблокирующими (т.е. без блокировки), за исключением случаев, когда они действительно должны блокироваться. То есть post () и try_wait () должны быть всегда без блокировки. И вызовы wait () должны быть без блокировок, если их вызовы сильно происходят после того, как достаточное количество post () вернулось Планировщик должен блокировать wait (), а не блокировать вращение. Что делать, если я также хочу wait_for с тайм-аутом - насколько это еще больше усложнит реализацию, при этом избегая голодания?

Существуют ли причины, по которым семафоры не входят в стандарт?

Edit3: Итак, я не знал, что есть предложение по стандарту P0514R4, которое точно решает эти проблемы и имеет решения для всех проблем, поднятых здесь, за исключением специального добавления std :: семафор. http://www.open -std.org / ОТК1 / SC22 / wg21 / документы / документы / 2018 / p0514r4.pdf

Также в Boost их нет. В частности, те, что в межпроцессном режиме, блокируются вращением.

Какие библиотеки поддерживают что-то подобное?

Возможно ли реализовать его поверх windows api и других распространенных систем?

edit: Невозможно реализовать этот lockfree с использованием atomics + mutex + condition_variable - вы должны либо заблокировать в посте, либо вращаться в ожидании. Если вы хотите post-free без блокировки (), вы не можете заблокировать мьютекс в post (). Я хочу работать на возможно вытесняющем планировщике, и я не хочу, чтобы post () блокировалась другими потоками, которые взяли мьютекс и получили прерывание. Итак, не является дубликатом вопросов, таких как C ++ 0x не имеет семафоров? Как синхронизировать потоки?

edit2: Следуя примеру реализации, просто продемонстрируем лучшее, что можно сделать с помощью atomics + mutex + condvar, AFAIK. post () и wait () выполняют один lockfree compare_exchange, и только если они должны, они блокируют мьютекс.

Однако post () не блокируется. И что еще хуже, он может быть заблокирован функцией wait (), которая заблокировала мьютекс и получила прерывание.

Для простоты я реализовал только post_one () и wait_one_for (Duration) вместо post (int) и wait_for (int, Duration). Кроме того, я предполагаю, что пробуждение не будет ложным, что не предусмотрено стандартом.

class semaphore //provides acquire release memory ordering for the user
{
private:
    using mutex_t = std::mutex;
    using unique_lock_t = std::unique_lock<mutex_t>;
    using condvar_t = std::condition_variable;
    using counter_t = int;

    std::atomic<counter_t> atomic_count_; 
    mutex_t mutex_;
    condvar_t condvar_;
    counter_t posts_notified_pending_;
    counter_t posts_unnotified_pending_;
    counter_t waiters_running_;
    counter_t waiters_aborted_pending_;

public:
    void post_one()
    {
        counter_t start_count = atomic_count_.fetch_add(+1, mo_acq_rel);
        if (start_count < 0) {
            unique_lock_t lock(mutex_);
            if (0 < waiters_running_) {
                ++posts_notified_pending_;
                condvar_.notify_one();
            }
            else {
                if (0 == waiters_aborted_pending_) {
                    ++posts_unnotified_pending_;
                }
                else {
                    --waiters_aborted_pending_;
                }
            }
        }
    }

    template< typename Duration >
    bool wait_one_for(Duration timeout)
    {
        counter_t start_count = atomic_count_.fetch_add(-1, mo_acq_rel);
        if (start_count <= 0) {
            unique_lock_t a_lock(mutex_);

            ++waiters_running_;
            BOOST_SCOPE_EXIT(&waiters_running_) {
                --waiters_running_;
            } BOOST_SCOPE_EXIT_END

            if( ( 0 == posts_notified_pending_ ) && ( 0 < posts_unnotified_pending_ ) ) {
                --posts_unnotified_pending_;
                return true;
            }
            else {

                auto wait_result = condvar_.wait_for( a_lock, timeout);
                switch (wait_result) {
                case std::cv_status::no_timeout: {
                    --posts_notified_pending_;
                    return true;
                } break;
                case std::cv_status::timeout: {

                    counter_t abort_count = atomic_count_.fetch_add(+1, mo_acq_rel);
                    if (abort_count >= 0) {
                        /*too many post() already increased a negative atomic_count_ and will try to notify, let them know we aborted. */
                        ++waiters_aborted_pending_;
                    }

                    return false;
                } break;
                default: assert(false); return false;
                }
            }
        }
        return true;
    }


    bool try_wait_one()
    {
        counter_t count = atomic_count_.load( mo_acquire );
        while (true) {
            if (count <= 0) {
                return false;
            }
            else if (atomic_count_.compare_exchange_weak(count, count-1, mo_acq_rel, mo_relaxed )) {
                return true;
            }
        }
    }
};

1 Ответ

0 голосов
/ 09 ноября 2018

Да, вы можете сделать это, если ваша операционная система предлагает подходящий механизм «парковки» и «отмены парковки», для которого не требуется блокировка для разблокировки. Под парком понимается разрешение перехода потока в спящий режим (блокировка ОС), а отмена - пробуждение этого потока.

Вы уже близки с вашим атомным счетчиком и подходом Кондвара. Проблема в том, что condvar a mutex требуется как часть семантики. Таким образом, вы должны отказаться от condvars и пойти немного ниже уровня. Во-первых, вы должны упаковать все состояния, такие как текущее значение семафора, есть ли какие-либо официанты (и, возможно, сколько их ожидают), в одно атомарное значение и манипулировать этим атомарно с помощью сравнения и обмена. Это предотвращает гонки, которые могли бы произойти, если бы вы использовали их как отдельные значения.

Затем вы можете нарисовать диаграмму состояний, показывающую все возможные состояния семафора с ребрами для всех возможных переходных состояний (например, состояние «нет официантов» перейдет в состояние «да официантов», когда прибудет официант). Вы реализуете все переходы с помощью сравнения и обмена, и всякий раз, когда он терпит неудачу, вы должны пересчитать переход, так как он мог измениться!

Тогда вам нужно только реализовать блокировку. В Windows вы должны использовать Events - автоматический или ручной сброс. У обоих есть свои преимущества и изюминки, и у этой кошки есть больше, чем один способ. Например, вы, вероятно, можете заставить его работать с одиночным общим событием и событиями автоматического сброса.

Однако, вот эскиз одного механизма, который использует объект официанта на поток в очереди без блокировки. Семафор состоит из управляющего слова с атомарной манипуляцией и списка без блокировки с типом элемента waiter_node или стеком или любым другим готовым параллельным списком, который вы хотите использовать.

Предположим, что каждому потоку принадлежит объект waiter_node, который содержит только один объект Event сброса вручную. Это может быть создано один раз и сохранено в TLS (вероятно, наиболее эффективным) или выделено по требованию каждый раз, когда необходимо выполнить ожидание, и отменено распределение, когда ожидание выполнено.

Вот основная схема:

Подождите

  • Если семафор доступен (положительный), CAS уменьшит его и продолжит.
  • Если семафор недоступен (ноль), поток вызывает ResetEvent для своего waiter_node, а затем помещает событие в список официантов, проверяет, что значение sem все еще равно нулю, а затем вызывает WaitForObject для него. waiter_node. Когда это вернется, запустите процедуру ожидания сверху.

Сообщение

  • Увеличить контрольное слово. Введите waiter_node, если есть, и наберите SetEvent.

Здесь есть всевозможные «расы», такие как waiter_node, выполняемая операцией post до того, как ожидающий поток даже спит на нем, но они должны быть доброкачественными.

Есть много вариантов, даже в этом дизайне на основе очереди официантов. Например, вы можете объединить список «заголовок» и контрольное слово, чтобы они были одним и тем же. Тогда wait не нужно дважды проверять счетчик семафоров, поскольку операция push одновременно проверяет состояние семафора. Вы также можете реализовать «прямую передачу обслуживания», когда поток post ing вообще не увеличивает управляющее слово, если есть официанты, а просто выводит его и пробуждает в нем информацию о том, что он успешно получил семафор.

В Linux вы заменяете Event на futex. Здесь проще реализовать решение «с одним фьютексом», поскольку futex допускает атомарную операцию проверки и блокировки внутри ядра, которая позволяет избежать многих гонок, присущих решению Event. Таким образом, базовый эскиз - это одно контрольное слово, и вы переходите атомарно с помощью CAS, а затем используете futex() с FUTEX_WAIT, чтобы выполнить вторую проверку контрольного слова и заблокировать атомарно (это атомная проверка-и-сон это сила futex).

...