Реализация 64-битного атомного счетчика с 32-битной атомикой - PullRequest
0 голосов
/ 10 февраля 2019

Я хотел бы собрать атомный счетчик uint64 из атомного uint32s.Счетчик имеет одного писателя и несколько читателей.Устройство записи является обработчиком сигналов, поэтому оно не должно блокировать.

Моя идея состоит в том, чтобы использовать счетчик поколений с младшим битом в качестве блокировки чтения.Считыватель повторяет попытки до тех пор, пока счетчик поколений не станет стабильным во время чтения, а младший бит не будет установлен.

Правильно ли следующий код в разработке и использовании упорядочения памяти?Есть ли лучший способ?

using namespace std;
class counter {
    atomic<uint32_t> lo_{};
    atomic<uint32_t> hi_{};
    atomic<uint32_t> gen_{};

    uint64_t read() const {
        auto acquire = memory_order_acquire;
        uint32_t lo, hi, gen1, gen2;
        do {
            gen1 = gen_.load(acquire);
            lo = lo_.load(acquire);
            hi = hi_.load(acquire);
            gen2 = gen_.load(acquire);
        } while (gen1 != gen2 || (gen1 & 1));
        return (uint64_t(hi) << 32) | lo;
    }

    void increment() {
        auto release = memory_order_release;
        gen_.fetch_add(1, release);
        uint32_t newlo = 1 + lo_.fetch_add(1, release);
        if (newlo == 0) {
            hi_.fetch_add(1, release);
        }
        gen_.fetch_add(1, release);
    }
};

edit : упс, исправлено auto acquire = memory_order_release;

1 Ответ

0 голосов
/ 10 февраля 2019

Это известный шаблон, называемый SeqLock.https://en.wikipedia.org/wiki/Seqlock. (С учетом упрощения, что имеется только один модуль записи, поэтому не требуется дополнительная поддержка для исключения одновременных модулей записи.)

Вам не нужно или нужно увеличивать саму переменную счетчика.использовать атомарные операции RMW .Вы можете просто загрузить обе половинки с атомарной 32-битной загрузкой, увеличить ее и атомарно сохранить результат.(С дешевым порядком памяти relaxed или release и использованием хранилища release для обновления 2-го счетчика).

Аналогично, счетчик также не должен быть атомным RMW.

Писателю нужны только чистые загрузки и чистые хранилища только с упорядочением версий, которые (намного) дешевле, чем атомные RMW, или магазины с порядком seq_cst :

  • загружаютсчетчик и значение в любом порядке
  • сохранить новый счетчик (старый + 1)
  • сохранить новое значение (или просто обновить младшую половину, если вы хотите разветвляться без переноса)
  • сохраните окончательный счетчик.

Порядок расположения магазинов в этих трех пунктах маркировки - единственное, что имеет значение.Ограничение записи после первого хранилища может быть хорошим, потому что мы на самом деле не хотим, чтобы стоимость создания обоих хранилищ обеих половин значения release была бы на процессорах, где это дороже, чем просто.


К сожалению, чтобы удовлетворить правилам C ++, value должен быть atomic<T>, что делает неудобным компилятор генерировать наиболее эффективный код, возможный для загрузки обеих половин.например, ARM ldp / stp пара нагрузки может быть не атомарной, но это не имеет значения.(И компиляторы часто не оптимизируют две отдельные атомарные 32-разрядные нагрузки в одну более широкую загрузку.)

Значения, которые другие потоки читают, когда счетчик последовательности нечетный, не имеют значения, но мы хотели бы избежать неопределенного поведения,Может быть, мы могли бы использовать объединение volatile uint64_t и atomic<uint64_t>


Я написал этот шаблон C ++ SeqLock<class T> для другой вопрос Я не закончил писать ответfor (выяснение, какие версии ARM имеют 64-битную атомарную загрузку и хранение).

Эта попытка проверить, поддерживает ли цель уже атомарные операции без блокировки на atomic<T>, чтобы вы не могли использовать ее, когда онабессмысленно.(Отключите это для тестирования с помощью определения IGNORE_SIZECHECK.) TODO: прозрачно вернитесь к этому, возможно, со специализацией шаблона, вместо использования static_assert.

Я предоставил функцию inc() дляT, который поддерживает оператор ++.TODO будет apply(), который принимает лямбду, чтобы сделать что-то с T, и сохранять результат между обновлениями счетчика последовательности.

// **UNTESTED**

#include <atomic>

#ifdef UNIPROCESSOR
// all readers and writers run on the same core
// ordering instructions at compile time is all that's necessary
#define ATOMIC_FENCE std::atomic_signal_fence
#else
// A reader can be running on another core while writing
// memory barriers or ARMv8 acquire / release loads / store are needed
#define ATOMIC_FENCE std::atomic_thread_fence
#endif
// using fences instead of .store(std::memory_order_release) will stop the compiler
// from taking advantage of a release-store instruction, like on AArch64 or x86


// SINGLE WRITER only.
// uses volatile + barriers for the data itself, like pre-C++11
template <class T>
class SeqLocked
{
#ifndef IGNORE_SIZECHECK
    // sizeof(T) > sizeof(unsigned)
    static_assert(!std::atomic<T>::is_always_lock_free, "A Seq Lock with a type small enough to be atomic on its own is totally pointless, and we don't have a specialization that replaces it with a straight wrapper for atomic<T>");
#endif

       // C++17 doesn't have a good way to express a load that doesn't care about tearing
       //  without explicitly writing it as multiple small parts and thus gimping the compiler if it can use larger loads
    volatile T data;          // volatile should be fine on any implementation where pre-C++11 lockless code was possible with volatile,
                              //  even though Data Race UB does apply to volatile variables in ISO C++11 and later.

    std::atomic<unsigned> seqcount{0};  // Even means valid, odd means modification in progress.
                                        //  unsigned wraps around at a power of 2 on overflow

public:
    T get() const {
        unsigned c0, c1;
        T tmp;

        do {
            c0 = seqcount.load(std::memory_order_relaxed);  // or this can be a std::memory_order_acquire for multicore so AArch64 can use LDAR
            ATOMIC_FENCE(std::memory_order_acquire);

            tmp = (T)data;       // load

            ATOMIC_FENCE(std::memory_order_acquire);  // LoadLoad barrier
            c1 = seqcount.load(std::memory_order_relaxed);
        } while(c0&1 || c0 != c1);     // retry if the counter changed or is odd

        return tmp;
    }

    // TODO: a version of this that takes a lambda for the operation on tmp
    T inc() {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);
        // make sure the data stores appear after the first counter update.

        T tmp = data;  // load
        ++tmp;
        data = tmp;    // store

        ATOMIC_FENCE(std::memory_order_release);
        seqcount.store(orig_count+2, std::memory_order_relaxed);  // Or use mo_release here, better on AArch64

        return tmp;
    }

    void set(T newval) {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);
        // make sure the data stores appear after the first counter update.

        data = newval;    // store

        ATOMIC_FENCE(std::memory_order_release);
        seqcount.store(orig_count+2, std::memory_order_relaxed);  // Or use mo_release here, better on AArch64
    }

};


/***** test callers *******/
#include <stdint.h>

struct sixteenbyte {
    //unsigned arr[4];
    unsigned long  a,b,c,d;
    sixteenbyte() = default;
    sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}
    //arr(old.arr) {}
};

void test_inc(SeqLocked<uint64_t> &obj) {  obj.inc(); }
sixteenbyte test_get(SeqLocked<sixteenbyte> &obj) { return obj.get(); }
//void test_set(SeqLocked<sixteenbyte> &obj, sixteenbyte val) { obj.set(val); }

uint64_t test_get(SeqLocked<uint64_t> &obj) {
    return obj.get();
}

// void atomic_inc_u64_seq_cst(std::atomic<uint64_t> &a) { ++a; }
uint64_t u64_inc_relaxed(std::atomic<uint64_t> &a) {
    // same but without dmb barriers
    return 1 + a.fetch_add(1, std::memory_order_relaxed);
}

uint64_t u64_load_relaxed(std::atomic<uint64_t> &a) {
    // gcc uses LDREXD, not just LDRD?
    return a.load(std::memory_order_relaxed);
}

void u64_store_relaxed(std::atomic<uint64_t> &a, uint64_t val) {
    // gcc uses a LL/SC retry loop even for a pure store?
    a.store(val, std::memory_order_relaxed);
}

Он компилируется в нужный нам ассемблер в проводнике компилятора Godbolt для ARM и других ISA.По крайней мере, для int64_t;большие типы структур могут быть скопированы менее эффективно из-за громоздких правил volatile.

Он использует неатомарную volatile T data для общих данных.Это технически неопределенное поведение гонки данных, но все компиляторы, которые мы используем на практике, были в порядке с многопоточным доступом до C ++ 11 к volatile объектам.А до C ++ 11 люди даже зависели от атомарности для некоторых размеров.Мы делаем , а не , мы проверяем счетчик и используем значение, которое мы читаем, только если не было одновременных записей.(В этом весь смысл SeqLock.)

Одна проблема с volatile T data заключается в том, что в ISO C ++ T foo = data не будет компилироваться для объектов структуры, если вы не предоставите конструктор копирования из volatileобъект, как

sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}

Это действительно раздражает нас, потому что нам не важны детали чтения памяти, просто то, что несколько операций чтения не оптимизированы в одно.

volatile здесь действительно неправильный инструмент , и обычный T data с достаточным ограждением, чтобы гарантировать, что чтение действительно происходит между считываниями атомного счетчика, было бы лучше.Например, мы могли бы сделать это в GNU C с asm("":::"memory"); барьером компилятора от переупорядочения до / после доступа.Это позволило бы компилятору копировать большие объекты с SIMD-векторами или чем-то еще, чего он не будет делать с отдельными volatile доступами.

Я думаю, std::atomic_thread_fence(mo_acquire) также будет достаточным барьером, но яне уверен на 100%.


В ISO C вы можете скопировать volatile агрегат (структуру), и компилятор будет выдавать все, что обычно для копирования такого количества байтов.Но в C ++ у нас не может быть хороших вещей, по-видимому.

...