Асинхронная запись в битовый массив - PullRequest
6 голосов
/ 02 апреля 2019

TL;DR Как безопасно выполнить однобитное обновление A[n/8] |= (1<<n%8); для A, представляющего собой огромный массив char с (т. Е. Установка n '* bit из A true)при параллельных вычислениях с использованием библиотеки C ++ 11 <thread>?


Я выполняю вычисления, которые легко распараллелить.Я вычисляю элементы определенного подмножества натуральных чисел, и я хочу найти элементы, которые не в подмножестве.Для этого я создаю огромный массив (например, A = new char[20l*1024l*1024l*1024l], то есть 20GiB).n бит этого массива имеет значение true, если n лежит в моем наборе.

При параллельном выполнении битов и установке значения true с использованием A[n/8] |= (1<<n%8);, мне кажется,чтобы получить небольшую потерю информации, предположительно из-за одновременной работы над одним и тем же байтом из A (каждый поток должен сначала прочитать байт, обновить один бит и записать байт обратно).Как я могу обойти это?Есть ли способ сделать это обновление как элементарную операцию?

Код следует.Версия GCC: g++ (Ubuntu 5.4.0-6ubuntu1~16.04.11) 5.4.0 20160609.Машина представляет собой 8-ядерный процессор Intel (R) Xeon® E5620 @ 2,40 ГГц, 37 ГБ ОЗУ.Опции компилятора: g++ -std=c++11 -pthread -O3

#include <iostream>
#include <thread>

typedef long long myint; // long long to be sure

const myint max_A = 20ll*1024ll*1024ll; // 20 MiB for testing
//const myint max_A = 20ll*1024ll*1024ll*1024ll; // 20 GiB in the real code
const myint n_threads = 1; // Number of threads
const myint prime = 1543; // Tested prime

char *A; 
const myint max_n = 8*max_A;

inline char getA(myint n) { return A[n/8] & (1<<(n%8)); }
inline void setAtrue(myint n) { A[n/8] |= (1<<n%8); }

void run_thread(myint startpoint) {
    // Calculate all values of x^2 + 2y^2 + prime*z^2 up to max_n
    // We loop through x == startpoint (mod n_threads)
    for(myint x = startpoint; 1*x*x < max_n; x+=n_threads)
        for(myint y = 0; 1*x*x + 2*y*y < max_n; y++)
            for(myint z = 0; 1*x*x + 2*y*y + prime*z*z < max_n; z++)
                setAtrue(1*x*x + 2*y*y + prime*z*z);
}

int main() {
    myint n;

    // Only n_threads-1 threads, as we will use the master thread as well
    std::thread T[n_threads-1];

    // Initialize the array
    A = new char[max_A]();

    // Start the threads
    for(n = 0; n < n_threads-1; n++) T[n] = std::thread(run_thread, n); 
    // We use also the master thread
    run_thread(n_threads-1);
    // Synchronize
    for(n = 0; n < n_threads-1; n++) T[n].join();

    // Print and count all elements not in the set and n != 0 (mod prime)
    myint cnt = 0;
    for(n=0; n<max_n; n++) if(( !getA(n) )&&( n%1543 != 0 )) {
        std::cout << n << std::endl;
        cnt++;
    }   
    std::cout << "cnt = " << cnt << std::endl;

    return 0;
}

Когда n_threads = 1, я получаю правильное значение cnt = 29289.Когда n_threads = 7, я получил cnt = 29314 и cnt = 29321 при двух разных вызовах, предполагая, что некоторые побитовые операции с одним байтом совпадали.

1 Ответ

5 голосов
/ 02 апреля 2019

std::atomic предоставляет все необходимые вам средства:

std::array<std::atomic<char>, max_A> A;

static_assert(sizeof(A[0]) == 1, "Shall not have memory overhead");
static_assert(std::atomic<char>::is_always_lock_free,
              "No software-level locking needed on common platforms");

inline char getA(myint n) { return A[n / 8] & (1 << (n % 8)); }
inline void setAtrue(myint n) { A[n / 8].fetch_or(1 << n % 8); }

Нагрузка в getA является атомной ( эквивалентна load()) и std::atomicдаже имеет встроенную поддержку or для сохранения сохраненного значения с другим (fetch_or), конечно, атомарно.

При инициализации A, наивный способ for (auto& a : A) a = 0; потребует синхронизации после каждого хранилища, чего можно избежать, отказавшись от безопасности потока.std::memory_order_release требует только, чтобы то, что мы пишем, было видно другим потокам (но не то, что записи других потоков видимы для нас).И действительно, если вы выполните

// Initialize the array
for (auto& a : A)
  a.store(0, std::memory_order_release);

, вы получите необходимую безопасность без какой-либо синхронизации на уровне сборки на x86.Вы можете сделать обратное для загрузок после завершения потоков, но это не дает дополнительных преимуществ для x86 (это просто mov в любом случае).

Демонстрация полного кода: https://godbolt.org/z/nLPlv1

...