Почему многопоточность вычислений с плавающей запятой на процессоре заставляет их работать значительно дольше? - PullRequest
1 голос
/ 10 марта 2019

Я сейчас работаю над научным моделированием (Gravitational nbody). Сначала я написал его наивным однопоточным алгоритмом, и это было приемлемо для небольшого числа частиц. Затем я многопоточный этот алгоритм (это смущающе параллельно), и программа заняла примерно 3 раза. Ниже приведен минимальный, полный, проверяемый пример тривиального алгоритма со схожими свойствами и выводом в файл в / tmp (он предназначен для работы в Linux, но C ++ также является стандартным). Имейте в виду, что если вы решите запустить этот код, он создаст файл размером 152,62 МБ. Данные выводятся, чтобы компилятор не мог оптимизировать вычисления вне программы.

#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <atomic>
#include <random>
#include <fstream>
#include <chrono>

constexpr unsigned ITERATION_COUNT = 2000;
constexpr unsigned NUMBER_COUNT = 10000;

void runThreaded(unsigned count, unsigned batchSize, std::function<void(unsigned)> callback){
    unsigned threadCount = std::thread::hardware_concurrency();
    std::vector<std::thread> threads;
    threads.reserve(threadCount);

    std::atomic<unsigned> currentIndex(0);

    for(unsigned i=0;i<threadCount;++i){
        threads.emplace_back([&currentIndex, batchSize, count, callback]{
            unsigned startAt = currentIndex.fetch_add(batchSize);

            if(startAt >= count){
                return;
            }else{
                for(unsigned i=0;i<count;++i){
                    unsigned index = startAt+i;
                    if(index >= count){
                        return;
                    }
                    callback(index);
                }
            }
        });
    }

    for(std::thread &thread : threads){
        thread.join();
    }
}

void threadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i) {
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        runThreaded(NUMBER_COUNT, 100, [&numbers, &newNumbers](unsigned x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        });
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

void unThreadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i){
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        for(unsigned x=0;x<NUMBER_COUNT;++x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        }
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

int main(int argc, char *argv[]) {
    if(argv[1][0] == 't'){
        threadedTest();
    }else{
        unThreadedTest();
    }
    return 0;
}

Когда я запускаю это (скомпилировано с clang 7.0.1 в Linux), я получаю следующие команды Linux time. Разница между ними похожа на то, что я вижу в моей настоящей программе. Запись, помеченная как «реальная», является релевантной для этого вопроса, поскольку это время часов, необходимое программе для запуска.

однопоточный:

real    6m27.261s
user    6m27.081s
sys     0m0.051s

Многопоточная:

real    14m32.856s
user    216m58.063s
sys     0m4.492s

В связи с этим я спрашиваю, что вызывает такое значительное замедление, когда я ожидаю, что оно значительно ускорится (примерно в 8 раз, поскольку у меня 8-ядерный 16-поточный ЦП). Я не реализую это на графическом процессоре, так как следующим шагом является внесение некоторых изменений в алгоритм, чтобы перевести его с O (n²) на O (nlogn), но они также не являются дружественными для GPU. Измененный алгоритм будет иметь меньшую разницу с моим в настоящее время реализованным алгоритмом O (n²), чем включенный пример. И наконец, я хочу заметить, что субъективное время выполнения каждой итерации (судя по времени между появлением линий итерации) существенно изменяется как в многопоточных, так и в непотоковых запусках.

Ответы [ 3 ]

5 голосов
/ 10 марта 2019

Трудно следовать этому коду, но я думаю, что вы дублируете работу в большом масштабе, потому что каждый поток выполняет почти всю работу, просто пропуская небольшую часть в начале.

Я предполагаю, что внутренний цикл runThreaded должен быть:

unsigned startAt = currentIndex.fetch_add(batchSize);

while (startAt < count) {
  if (startAt >= count) {
    return;
  } else {
    for(unsigned i=0;i<batchSize;++i){
      unsigned index = startAt+i;

      if(index >= count){
        return;
      }

      callback(index);
    }
  }

  startAt = currentIndex.fetch_add(batchSize);
}

Где i < batchSize здесь ключ. Вы должны выполнять только столько работы, сколько требует пакет, а не count раз, то есть весь список минус начальное смещение.

С этим обновлением код работает значительно быстрее. Я не уверен, выполняет ли он всю необходимую работу, потому что трудно сказать, происходит ли это на самом деле, результат очень минимален.

2 голосов
/ 10 марта 2019

Для простого распараллеливания на нескольких процессорах я рекомендую использовать tbb::parallel_for.Он использует правильное количество процессоров и разделяет диапазон для вас, полностью исключая риск неправильной реализации.В качестве альтернативы, есть параллель for_each в C ++ 17 .Другими словами, эта проблема имеет ряд хороших решений.

Векторизация кода - сложная проблема, и ни clang++-6, ни g++-8 не автоматически векторизуют базовый код.Следовательно, в SIMD-версии ниже я использовал превосходные Vc: переносимые типы C ++ с нулевыми накладными расходами для явного параллельного программирования данных * библиотеки 1012 *.

Ниже приведен рабочий тест, который сравнивает:

  • Базовая версия.
  • SIMD-версия.
  • SIMD + многопоточная версия.


#include <Vc/Vc>
#include <tbb/parallel_for.h>

#include <algorithm>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>

constexpr int ITERATION_COUNT = 20;
constexpr int NUMBER_COUNT = 20000;

double baseline() {
    double result = 0;

    std::vector<double> newNumbers(NUMBER_COUNT);
    std::vector<double> numbers(NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers)
        n = rnd();

    for(int i = 0; i < ITERATION_COUNT; ++i) {
        for(int x = 0; x < NUMBER_COUNT; ++x) {
            double total = 0;
            for(int y = 0; y < NUMBER_COUNT; ++y) {
                auto d = (y - x);
                total += numbers[y] * (d * d);
            }
            newNumbers[x] = total;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), 0.);
        swap(numbers, newNumbers);
    }

    return result;
}

double simd() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const incv(Vc::double_v::Size);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        Vc::double_v x(Vc::IndexesFromZero);
        for(auto& new_n : newNumbers) {
            Vc::double_v totals;
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    auto d = y - x;
                    totals += n[j] * (d * d);
                    ++y;
                }
            }
            new_n = totals;
            x += incv;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

double simd_mt() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const v0123(Vc::IndexesFromZero);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        constexpr int SIMD_STEP = 4;
        tbb::parallel_for(0, SIMD_NUMBER_COUNT, SIMD_STEP, [&](int ix) {
            Vc::double_v xs[SIMD_STEP];
            for(int is = 0; is < SIMD_STEP; ++is)
                xs[is] = v0123 + (ix + is) * Vc::double_v::Size;
            Vc::double_v totals[SIMD_STEP];
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    for(int is = 0; is < SIMD_STEP; ++is) {
                        auto d = y - xs[is];
                        totals[is] += n[j] * (d * d);
                    }
                    ++y;
                }
            }
            std::copy_n(totals, SIMD_STEP, &newNumbers[ix]);
        });
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

struct Stopwatch {
    using Clock = std::chrono::high_resolution_clock;
    using Seconds = std::chrono::duration<double>;
    Clock::time_point start_ = Clock::now();

    Seconds elapsed() const {
        return std::chrono::duration_cast<Seconds>(Clock::now() - start_);
    }
};


std::ostream& operator<<(std::ostream& s, Stopwatch::Seconds const& a) {
    auto precision = s.precision(9);
    s << std::fixed << a.count() << std::resetiosflags(std::ios_base::floatfield) << 's';
    s.precision(precision);
    return s;
}

void benchmark() {
    Stopwatch::Seconds baseline_time;
    {
        Stopwatch s;
        double result = baseline();
        baseline_time = s.elapsed();
        std::cout << "baseline: " << result << ", " << baseline_time << '\n';
    }

    {
        Stopwatch s;
        double result = simd();
        auto time = s.elapsed();
        std::cout << "    simd: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }

    {
        Stopwatch s;
        double result = simd_mt();
        auto time = s.elapsed();
        std::cout << " simd_mt: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }
}

int main() {
    benchmark();
    benchmark();
    benchmark();
}

Время:

baseline: 2.76582e+257, 6.399848397s
    simd: 2.76582e+257, 1.600373449s, 3.99897x speedup
 simd_mt: 2.76582e+257, 0.168638435s, 37.9501x speedup

Примечания:

  • Мой аппарат поддерживает AVX, но не AVX-512, поэтому при использовании SIMD он примерно в 4 раза быстрее.
  • *Версия 1037 * использует 8 потоков на моей машине и большие шаги SIMD.Теоретическое ускорение составляет 128x, на практике - 38x.
  • clang++-6 не может автоматически векторизовать базовый код, как и g++-8.
  • g++-8 генерирует значительно более быстрый код для SIMD-версий, чем clang++-6.
1 голос
/ 25 апреля 2019

Ваше сердце определенно находится в нужном месте, за исключением одной или двух ошибок.

par_for - сложная проблема, зависящая от полезной нагрузки вашего цикла.Для этого не существует универсального решения.Полезная нагрузка может быть чем угодно, от пары добавлений к почти бесконечным блокам мьютекса - например, путем выделения памяти.

Атомная переменная как шаблон рабочего элемента всегда работала хорошо для меня, но помните, что атомарные переменные имеютвысокая стоимость на X86 (~ 400 циклов) и даже высокая стоимость, если они находятся в неисполненной ветви, как я обнаружил, к моему риску.

Некоторая перестановка из следующего обычно хороша.Выбор правильного chunks_per_thread (как в вашем batchSize) имеет решающее значение.Если вы не доверяете своим пользователям, вы можете протестировать выполнение нескольких итераций цикла, чтобы угадать лучший уровень разбиения на блоки.

#include <atomic>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>

template<typename Func>
void par_for(int start, int end, int step, int chunks_per_thread, Func func) {
  using namespace std;
  using namespace chrono;

  atomic<int> work_item{start};
  vector<future<void>> futures(std::thread::hardware_concurrency());

  for (auto &fut : futures) {
    fut = async(std::launch::async, [&work_item, end, step, chunks_per_thread, &func]() {
      for(;;) {
        int wi = work_item.fetch_add(step * chunks_per_thread);
        if (wi > end) break;
        int wi_max = std::min(end, wi+step * chunks_per_thread);
        while (wi < wi_max) {
          func(wi);
          wi += step;
        }
      }
    });
  }

  for (auto &fut : futures) {
    fut.wait();
  }
}

int main() {
  using namespace std;
  using namespace chrono;
  for (int k = 0; k != 2; ++k) {
    auto t0 = high_resolution_clock::now();
    constexpr int loops = 100000000;
    if (k == 0) {
      for (int i = 0; i != loops; ++i ) {
        if (i % 10000000 == 0) printf("%d\n", i);
      }
    } else {
      par_for(0, loops, 1, 100000, [](int i) {
        if (i % 10000000 == 0) printf("%d\n", i);
      });
    }
    auto t1 = high_resolution_clock::now();
    duration<double, milli> ns = t1 - t0;
    printf("k=%d %fms total\n", k, ns.count());
  }
}

результаты

...
k=0 174.925903ms total
...
k=1 27.924738ms total

О ускорении в 6 раз.

Я избегаю термина «смущающе параллельный», поскольку это почти никогда не имеет место.Вы платите экспоненциально большими затратами, чем больше ресурсов используете на пути от кэша 1-го уровня (без задержки) к охватывающему глобус кластеру (задержка в мс).Но я надеюсь, что этот фрагмент кода будет полезен в качестве ответа.

...