Обновление Atomi c в лямбда с резьбой - PullRequest
0 голосов
/ 22 марта 2020

Я думаю, что обновление значения Atomi c таким образом внутри потока не является хорошим (иногда сумма не выглядит хорошей) *

    std::atomic<double> e(0);
    std::mutex m;

    auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) {
      double ee = 0;
      for(auto k = begin; k != end; ++k) {
        ee += something[k];
      }
      {
          const std::lock_guard<std::mutex> lock(m);
          acc.store( acc.load() + ee );
      }
    };

    std::vector<std::thread> threads(nbThreads);
    const size_t grainsize = miniBatchSize / nbThreads;

    size_t work_iter = 0;
    for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) {
      *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
      work_iter += grainsize;
    }
    threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));

    for(auto&& i : threads) {
      i.join();
    }

Я прав, что мне здесь не хватает? проблема в std :: ref (e)?

Ответы [ 2 ]

1 голос
/ 22 марта 2020

Проблема в строке:

acc.store( acc.load() + ee );

есть 2 операции загрузки и сохранения, в промежутке между ними другой поток может изменить значение.

К сожалению, atomi c не поддерживает fetch_add.

Вы можете попробовать это:

    auto atomic_fetch_add = [](std::atomic<double>* obj, double arg)
    {
        auto expected = obj->load();
        while (!atomic_compare_exchange_weak(obj, &expected, expected + arg))
            ;
        return expected;
    };

    std::atomic<double> e(0);

    auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) {
      double ee = 0;
      for(auto k = begin; k != end; ++k) {
        ee += something[k];
      }
      // acc.store( acc.load() + ee );
      atomic_fetch_add(&acc, ee);
    };

    std::vector<std::thread> threads(nbThreads);
    const size_t grainsize = miniBatchSize / nbThreads;

    size_t work_iter = 0;
    for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) {
      *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
      work_iter += grainsize;
    }
    threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));

    for(auto&& i : threads) {
      i.join();
    }

Хотя нет никакой гарантии, что atomi c не использует мьютексы, поэтому вы ' Я должен проверить вашу реализацию.

1 голос
/ 22 марта 2020

Вы хотите, чтобы загрузка и сохранение происходили как действие атома c. В настоящее время ваш код выполняет:

acc.store(acc.load() + ee);

Теперь представьте, что поток прерывается сразу после выполнения load() (давайте назовем загруженное значение acc_old). Другой поток делает свое дело (и таким образом модифицирует acc), а затем первый поток запускается снова. Он не будет перезагружаться acc, поскольку уже загружен его значение. Таким образом, этот поток теперь будет обновлять acc, чтобы он содержал acc_old + ee. И бац, неправильный результат.

Вместо этого используйте либо fetch_add, либо operator+=. Оба гарантируют поведение атома c для всей операции сложения. Т.е.:

acc += ee; // or
acc.fetch_add(ee);

Редактировать: Обратите внимание, что эти функции поддерживаются только для атомарных операций с плавающей запятой, начиная с C ++ 20. Для целочисленных типов они поддерживаются начиная с C ++ 11. Так что если вам нужна плавающая точка, вам, вероятно, придется придерживаться мьютекса. В этом случае я бы предложил обернуть двойное значение и мьютекс в один класс, чтобы вы не могли случайно использовать его неправильно.

...