Cycli c разбиение выполнения на несколько потоков (1-N-1-N-1 ...) - PullRequest
1 голос
/ 29 мая 2020

Рассмотрим этот случай:

for (...)
{
    const size_t count = ...

    for (size_t i = 0; i < count; ++i)
    {
        calculate(i); // thread-safe function
    }
}

Какое наиболее элегантное решение для максимизации производительности с использованием C ++ 17 и / или Boost?

Cycli c "create + join" потоки не имеют смысла из-за огромных накладных расходов (которые в моем случае в точности равны возможному выигрышу).

Поэтому мне нужно создать N потоков только один раз и синхронизировать их с основным (используя: mutex, shared_mutex, condition_variable , atomi c, et c.). В такой обычной и понятной ситуации (чтобы все было по-настоящему безопасно и быстро) это оказалось довольно сложной задачей. Придерживаясь его в течение нескольких дней, я чувствую, что «изобретаю велосипед» ...

  • Обновление 1: calculate (x) и calculate (y) могут (и должны) выполняться параллельно
  • Обновление 2: std :: atomi c :: fetch_add (или что-то в этом роде) предпочтительнее очереди (или чего-то еще).
  • Обновление 3: экстремальные вычисления (т.е. миллионы «внешних» вызовов и сотни «внутренних»)
  • Обновление 4: calculate () изменяет данные внутреннего объекта без возврата значения

Промежуточное решение

По какой-то причине «asyn c + wait» намного быстрее, чем потоки «create + join». Таким образом, эти два примера увеличивают скорость на 100%:

Пример 1

for (...)
{
    const size_t count = ...

    future<void> execution[cpu_cores];

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x] = async(launch::async, ref(*this), x, count);
    }

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x].wait();
    }
}

void operator()(const size_t x, const size_t count)
{
    for (size_t i = x; i < count; i += cpu_cores)
    {
        calculate(i);
    }
}

Пример 2

for (...)
{
    index = 0;

    const size_t count = ...

    future<void> execution[cpu_cores];

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x] = async(launch::async, ref(*this), count);
    }

    for (size_t x = 0; x < cpu_cores; ++x)
    {
        execution[x].wait();
    }
}

atomic<size_t> index;

void operator()(const size_t count)
{
    for (size_t i = index.fetch_add(1); i < count; i = index.fetch_add(1))
    {
        calculate(i);
    }
}

Можно ли сделать это еще быстрее, создав потоки только один раз, а затем синхронизируя их с небольшими накладными расходами?

Окончательное решение

Дополнительно + 20% увеличения скорости в сравнение с std :: asyn c!

for (size_t i = 0; i < _countof(index); ++i) { index[i] = i; }

for_each_n(par_unseq, index, count, [&](const size_t i) { calculate(i); });

Можно ли избежать «индекса» избыточного массива?

Да:

for_each_n(par_unseq, counting_iterator<size_t>(0), count,

    [&](const size_t i)
    {
        calculate(i);
    });

1 Ответ

1 голос
/ 30 мая 2020

Раньше вы использовали OpenMP, GNU Parallel , Intel TBB.

Если у вас c ++ 17², я бы предложил использовать политики выполнения со стандартными алгоритмами.

Это действительно лучше, чем вы можете ожидать, чтобы делать что-то самостоятельно, хотя

  • требует некоторой предусмотрительности, чтобы выбрать ваши типы, которые будут соответствовать стандартным алгоритмы
  • по-прежнему помогают, если вы знаете, что произойдет под капотом

Вот простой пример без лишних слов:

Live On Compiler Explorer

#include <thread>
#include <algorithm>
#include <random>
#include <execution>
#include <iostream>
using namespace std::chrono_literals;

static size_t s_random_seed = std::random_device{}();

static auto generate_param() {
    static std::mt19937 prng {s_random_seed};
    static std::uniform_int_distribution<> dist;
    return dist(prng);
}

struct Task {
    Task(int p = generate_param()) : param(p), output(0) {}

    int param;
    int output;

    struct ByParam  { bool operator()(Task const& a, Task const& b) const { return a.param < b.param; } };
    struct ByOutput { bool operator()(Task const& a, Task const& b) const { return a.output < b.output; } };
};

static void calculate(Task& task) {
    //std::this_thread::sleep_for(1us);
    task.output = task.param ^ 0xf0f0f0f0;
}

int main(int argc, char** argv) {
    if (argc>1) {
        s_random_seed = std::stoull(argv[1]);
    }

    std::vector<Task> jobs;

    auto now = std::chrono::high_resolution_clock::now;
    auto start = now();

    std::generate_n(
            std::execution::par_unseq,
            back_inserter(jobs),
            1ull << 28, // reduce for small RAM!
            generate_param);

    auto laptime = [&](auto caption) {
        std::cout << caption << " in " << (now() - start)/1.0s << "s" << std::endl;
        start = now();
    };
    laptime("generate randum input");

    std::sort(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        Task::ByParam{});

    laptime("sort by param");

    std::for_each(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        calculate);

    laptime("calculate");

    std::sort(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        Task::ByOutput{});

    laptime("sort by output");

    auto const checksum = std::transform_reduce(
        std::execution::par_unseq,
        begin(jobs), end(jobs),
        0, std::bit_xor<>{},
        std::mem_fn(&Task::output)
    );

    laptime("reduce");
    std::cout << "Checksum: " << checksum << "\n";
}

При запуске с семенем 42 выводит:

generate randum input in 10.8819s
sort by param in 8.29467s
calculate in 0.22513s
sort by output in 5.64708s
reduce in 0.108768s
Checksum: 683872090

Загрузка ЦП составляет 100% на всех ядрах, кроме первого (случайное -generation) step.


¹ (думаю, у меня есть ответы, демонстрирующие все это на этом сайте).

² См. Реализованы ли уже параллельные алгоритмы C ++ 17 ?

...