Реализация очереди SPS C без блокировок на ARM - PullRequest
0 голосов
/ 29 мая 2020

Я пытаюсь написать очередь с одним производителем и одним потребителем для ARM, и я думаю, что близок к тому, чтобы окунуться в DMB, но мне нужна некоторая проверка (я больше знаком с std :: atomi c. )

Вот где я:

bool push(const_reference value)
{
    // Check for room
    const size_type currentTail = tail;
    const size_type nextTail = increment(currentTail);
    if (nextTail == head)
        return false;

    // Write the value
    valueArr[currentTail] = value;

    // Prevent the consumer from seeing the incremented tail before the
    // value is written.
    __DMB();

    // Increment tail
    tail = nextTail;

    return true;
}

bool pop(reference valueLocation)
{
    // Check for data
    const size_type currentHead = head;
    if (currentHead == tail)
        return false;

    // Write the value.
    valueLocation = valueArr[currentHead];

    // Prevent the producer from seeing the incremented head before the
    // value is written.
    __DMB();

    // Increment the head
    head = increment(head);

    return true;
}

Мой вопрос: правильно ли размещено и обосновано мое размещение DMB? Или все еще есть понимание, что я пропал? Я особенно не уверен, нужны ли условным операторам некоторая защита при работе с переменной, которая обновляется другим потоком (или прерыванием).

1 Ответ

1 голос
/ 30 мая 2020
  • Барьер необходим, но недостаточен, вам также нужна семантика "приобретения" для загрузки переменной, измененной другим потоком. (Или, по крайней мере, consume, но чтобы получить это без барьера, потребуется asm для создания зависимости данных. Компилятор не стал бы этого делать, если бы уже имел зависимость от элемента управления.)
  • Одноядерная система может используйте только барьер компилятора, например GNU C asm("":::"memory") или std::atomic_signal_fence(std::memory_order_release), а не dmb. Сделайте макрос, чтобы вы могли выбирать между SMP-безопасными барьерами или UP (однопроцессорными) барьерами.
  • head = increment(head); - бессмысленная перезагрузка head, используйте локальную копию.
  • используйте std::atomic для переносимого получения необходимого генератора кода.

Обычно вам не нужно катать собственные атомики; современные компиляторы для ARM реализуют std::atomic<T>. Но AFAIK, никакие реализации std::atomic<> не знают об одноядерных системах, чтобы избежать реальных препятствий и просто быть в безопасности. прерывания, которые могут вызвать переключение контекста.

В одноядерной системе вам не нужен dsb, только барьер компилятора. ЦП сохранит иллюзию последовательного выполнения инструкций asm в программном порядке. Вам просто нужно убедиться, что компилятор генерирует asm, который выполняет все в правильном порядке. Вы можете сделать это, используя std::atomic с std::memory_order_relaxed и ручные atomic_signal_fence(memory_order_acquire) или release барьеры. (Не atomic_thread_fence; это будет выдавать инструкции asm, обычно dsb).


Каждый поток считывает переменную, которую изменяет другой поток. Вы правильно делаете хранилища релизов модификаций, убедившись, что они видны только после доступа к массиву.

Но эти чтения также должны быть acqu-load to syn c с этими хранилищами релизов . Например, чтобы убедиться, что push не записывает valueArr[currentTail] = value; до того, как pop закончит чтение того же элемента. Или чтение записи до того, как она будет полностью записана.

Без каких-либо барьеров режим отказа будет таков, что if (currentHead == tail) return false; фактически не проверяет значение tail из памяти до тех пор, пока не произойдет valueLocation = valueArr[currentHead];. Изменение порядка загрузки во время выполнения может легко сделать это на слабо упорядоченном ARM. Если адрес загрузки имел зависимость данных от tail, это могло бы избежать необходимости в барьере там в системе SMP (ARM гарантирует упорядочение зависимостей в asm; функция, которую должен был предоставить mo_consume). Но если компилятор просто испускает ветку, это только зависимость элемента управления, а не данные. Если бы вы писали вручную в asm, предполагаемая загрузка, такая как ldrne r0, [r1, r2], для флагов, установленных сравнением, я думаю, создала бы зависимость data .

Изменение порядка во время компиляции менее правдоподобно, но барьер только для компилятора является бесплатным, если он только останавливает компилятор от выполнения того, что он в любом случае не собирался делать.


непроверенная реализация, компилируется в asm, который выглядит нормально, но никакого другого тестирования

Сделайте что-нибудь подобное для push. Я включил функции обертки для загрузки / сохранения и fullbarrier (). (Эквивалент макроса Linux ядра smp_mb(), определяемого как время компиляции или компиляция + барьер времени выполнения.)

#include <atomic>

#define UNIPROCESSOR


#ifdef UNIPROCESSOR
#define fullbarrier()  asm("":::"memory")   // GNU C compiler barrier
                          // atomic_signal_fence(std::memory_order_seq_cst)
#else
#define fullbarrier() __DMB()    // or atomic_thread_fence(std::memory_order_seq_cst)
#endif

template <class T>
T load_acquire(std::atomic<T> &x) {
#ifdef UNIPROCESSOR
    T tmp = x.load(std::memory_order_relaxed);
    std::atomic_signal_fence(std::memory_order_acquire);
    // or fullbarrier();  if you want to use that macro
    return tmp;
#else
    return x.load(std::memory_order_acquire);
    // fullbarrier() / __DMB();
#endif
}

template <class T>
void store_release(std::atomic<T> &x, T val) {
#ifdef UNIPROCESSOR
    std::atomic_signal_fence(std::memory_order_release);
    // or fullbarrier();
    x.store(val, std::memory_order_relaxed);
#else
    // fullbarrier() / __DMB(); before plain store
    return x.store(val, std::memory_order_release);
#endif
}

template <class T>
struct SPSC_queue {
  using size_type = unsigned;
  using value_type = T;
  static const size_type size = 1024;

  std::atomic<size_type> head;
  value_type valueArr[size];
  std::atomic<size_type> tail;  // in a separate cache-line from head to reduce contention

  bool push(const value_type &value)
  {
    // Check for room
    const size_type currentTail = tail.load(std::memory_order_relaxed);  // no other writers to tail, no ordering needed
    const size_type nextTail = currentTail + 1;    // modulo separately so empty and full are distinguishable.
    if (nextTail == load_acquire(head))
        return false;

    valueArr[currentTail % size] = value;
    store_release(tail, nextTail);
    return true;
  }
};

// instantiate the template for  int  so we can look at the asm
template bool SPSC_queue<int>::push(const value_type &value);

Компилируется чисто в обозревателе компилятора Godbolt с нулевыми барьерами, если вы используете -DUNIPROCESSOR с g++9.2 -O3 -mcpu=cortex-a15 (просто чтобы выбрать случайное ядро ​​modern-i sh ARM, чтобы G CC мог встроить std::atomic функцию загрузки / сохранения и барьеры для случая, не являющегося однопроцессорным.

...