Когда memory_order_acquire или memory_order_release можно безопасно удалить из compare_exchange? - PullRequest
1 голос
/ 29 апреля 2020

Я ссылаюсь на код в учебнике сопрограмм Льюиса Бейкера.

https://lewissbaker.github.io/2017/11/17/understanding-operator-co-await

bool async_manual_reset_event::awaiter::await_suspend(
  std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
  // Special m_state value that indicates the event is in the 'set' state.
  const void* const setState = &m_event;

  // Remember the handle of the awaiting coroutine.
  m_awaitingCoroutine = awaitingCoroutine;

  // Try to atomically push this awaiter onto the front of the list.
  void* oldValue = m_event.m_state.load(std::memory_order_acquire);
  do
  {
    // Resume immediately if already in 'set' state.
    if (oldValue == setState) return false; 

    // Update linked list to point at current head.
    m_next = static_cast<awaiter*>(oldValue);

    // Finally, try to swap the old list head, inserting this awaiter
    // as the new list head.
  } while (!m_event.m_state.compare_exchange_weak(
             oldValue,
             this,
             std::memory_order_release,
             std::memory_order_acquire));

  // Successfully enqueued. Remain suspended.
  return true;
}

, где m_state - это просто std::atomic<void *>.

bool async_manual_reset_event::is_set() const noexcept
{
  return m_state.load(std::memory_order_acquire) == this;
}
void async_manual_reset_event::reset() noexcept
{
  void* oldValue = this;
  m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}
void async_manual_reset_event::set() noexcept
{
  // Needs to be 'release' so that subsequent 'co_await' has
  // visibility of our prior writes.
  // Needs to be 'acquire' so that we have visibility of prior
  // writes by awaiting coroutines.
  void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
  if (oldValue != this)
  {
    // Wasn't already in 'set' state.
    // Treat old value as head of a linked-list of waiters
    // which we have now acquired and need to resume.
    auto* waiters = static_cast<awaiter*>(oldValue);
    while (waiters != nullptr)
    {
      // Read m_next before resuming the coroutine as resuming
      // the coroutine will likely destroy the awaiter object.
      auto* next = waiters->m_next;
      waiters->m_awaitingCoroutine.resume();
      waiters = next;
    }
  }
}

Обратите внимание, что в m_state.exchange метода set() приведенный выше комментарий ясно показывает, почему для вызова обмена требуются как приобретение, так и освобождение.

Интересно, почему в m_state.compare_exchange_weak await_suspend() метод, третий параметр - это std :: memory_order_release, но не memory_order_acq_rel (приобретение удалено).

Автор (Льюис) действительно объяснил, что нам нужно освободить в Compare_exchange_weak, потому что нам нужно разрешить позже set () см. записи в Compare_exchange_weak. Но почему бы нам не потребовать, чтобы в других потоках было показано сравнение Compare_exchange_weak в других потоках?

Это из-за последовательности выпуска? То есть в цепочке выпусков (сначала пишите выпуск, и все промежуточные операции - это операции «чтение, получение, затем запись», а последняя операция - получение чтения), тогда вам не нужно указывать, что они получают в середине. ?

В следующем коде я попытался реализовать общую блокировку,

    struct lock {
        uint64_t exclusive : 1;
        uint64_t id : 48;
        uint64_t shared_count : 15;
    };
    std::atomic<lock> lock_ { {0, 0, 0} };
    bool try_lock_shared() noexcept {
        lock currentlock = lock_.load(std::memory_order_acquire);
        if (currentlock.exclusive == 1) {
            return false;
        }
        lock newlock;
        do {
            newlock = currentlock;
            newlock.shared_count++;
        }
        while(!lock_.compare_exchange_weak(currentlock, newlock, std::memory_order_acq_rel) && currentlock.exclusive == 0);

        return currentlock.exclusive == 0;
    }
    bool try_lock() noexcept {
        uint64_t id = utils::get_thread_id();
        lock currentlock = lock_.load(std::memory_order_acquire);
        if (currentlock.exclusive == 1) {
            assert(currentlock.id != id);
            return false;
        }

        bool result = false;
        lock newlock { 1, id, 0 };
        do {
            newlock.shared_count = currentlock.shared_count;
        }
        while(!(result = lock_.compare_exchange_weak(currentlock, newlock, std::memory_order_acq_rel)) && currentlock.exclusive == 0);

        return result;
    }

Я везде использовал lock_.compare_exchange_weak(currentlock, newlock, std::memory_order_acq_rel), могу ли я смело заменить их на compare_exchange_weak(currentlock, newlock, std::memory_order_release, std::memory_order_acquire)?

Я также мог видеть примеры, когда memory_order_release удаляется из compare_exchange_strong (см. Функцию compare_exchange_strong in reset() кода Льюиса), где вам нужно только std :: memory_order_acquire для compare_exchange_strong (но не release). Я действительно не видел, что memory_order_release удален из слабых, ни memory_order_acquire удален из сильных.

Это заставило меня задуматься, есть ли более глубокое правило, которое я не понял или нет.

Спасибо.

1 Ответ

1 голос
/ 29 апреля 2020

memory_order_acquire имеет смысл только для операций, которые читают значение, а memory_order_release имеет смысл только для операций, которые записывают значение. Поскольку операции чтения-изменения-записи позволяют выполнять чтение и запись, можно объединять эти порядки памяти, но это не всегда необходимо.

m_event.m_state.compare_exchange_weak использует memory_order_release для записи нового значения, поскольку оно пытается заменить значение, которое ранее было прочитано, с помощью memory_order_acquire:

  // load initial value using memory_order_acquire
  void* oldValue = m_event.m_state.load(std::memory_order_acquire);
  do {
    ...
  } while (!m_event.m_state.compare_exchange_weak(oldValue, this,
             std::memory_order_release,
             // in case of failure, load new value using memory_order_acquire
             std::memory_order_acquire));

IMHO в этом случае даже не нужно вообще использовать memory_order_acquire, поскольку oldValue никогда не разыменовывается, а только сохраняется как следующий указатель, т. е. было бы идеально найти заменить эти два memory_order_acquire на memory_order_relaxed.

В async_manual_reset_event::set() ситуация отличается:

  void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
  if (oldValue != this)
  {
    auto* waiters = static_cast<awaiter*>(oldValue);
    while (waiters != nullptr)
    {
      // we are de-referencing the pointer read from m_state!
      auto* next = waiters->m_next;
      waiters->m_awaitingCoroutine.resume();
      waiters = next;
    }

Так как мы снимаем ссылки с указателя, мы при чтении m_state мы должны убедиться, что эти чтения происходят после записи в эти объекты-официанты. Это обеспечивается с помощью отношения синхронизации с m_state. Writer добавляется с помощью ранее обсужденного Compare_exchange с использованием memory_order_release. Часть receive обмена синхронизируется с release-compare_exchange (и фактически всеми предыдущими release-compare_exchange, которые являются частью последовательности релизов), обеспечивая тем самым необходимое отношение «до и после».

Если честно, Я не уверен, зачем этому обмену нужна часть релиза. Я думаю, что автор, возможно, хотел быть на «безопасной стороне», поскольку некоторые другие операции также сильнее, чем необходимо (я уже упоминал, что await_suspend не требует memory_order_acquire, и то же самое относится к is_set и reset ).

Для вашей реализации блокировки это очень просто - когда вы хотите получить блокировку (try_lock_shared / try_lock), используйте memory_order_acquire только для операции сравнения-обмена. Для снятия блокировки необходимо использовать memory_order_release.

Аргумент также довольно прост: вы должны убедиться, что после того, как вы получили блокировку, любые изменения, ранее сделанные в данных, защищенных блокировкой, были видны текущий владелец, то есть вы должны убедиться, что эти изменения произошли до операций, которые вы собираетесь выполнить после получения блокировки . Это достигается путем установления отношения синхронизации с try_lock (acqu-CAS) и предыдущим unlock (release-store).

При попытке спорить о правильности реализации на основе Семантика модели памяти C ++. Обычно я делаю это следующим образом:

  1. идентифицирует необходимые отношения до (например, для вашей блокировки)
  2. , чтобы убедиться, что эти отношения происходят до. установлены правильно на всех путях кода

И я всегда комментирую операции atomi c, чтобы документировать, как устанавливаются эти отношения (т. е. какие другие операции задействованы). Например:

  // (1) - this acquire-load synchronizes-with the release-CAS (11)
  auto n = head.load(std::memory_order_acquire);

  // (8) - this acquire-load synchronizes-with the release-CAS (11)
  h.acquire(head, std::memory_order_acquire);

  // (11) - this release-CAS synchronizes-with the acquire-load (1, 8)
  if (head.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed))

(полный код см. https://github.com/mpoeter/xenium/blob/master/xenium/michael_scott_queue.hpp)

Для получения более подробной информации о модели памяти C ++ я могу порекомендовать этот документ, который у меня есть в соавторстве: Модели памяти для программистов на C / C ++

...