Как мы можем запустить n экземпляров алгоритма параллельно и эффективно вычислить среднее значение функции результатов? - PullRequest
0 голосов
/ 10 сентября 2018

Я хочу запустить n экземпляров алгоритма параллельно и вычислить среднее значение функции f результатов. Если я не ошибаюсь, следующий код достигает этой цели:

struct X {};
int f(X) { return /* ... */; }

int main()
{
    std::size_t const n = /* ... */;
    std::vector<std::future<X>> results;
    results.reserve(n);

    for (std::size_t i = 0; i < n; ++i)
        results.push_back(std::async([]() -> X { /* ... */ }));

    int mean = 0;
    for (std::size_t i = 0; i < n; ++i)
        mean += f(results[i].get());
    mean /= n;
}

Однако, есть ли лучший способ сделать это? Очевидная проблема с кодом выше заключается в следующем: порядок суммирования в строке mean += f(results[i].get()); не имеет значения. Таким образом, было бы лучше добавить результаты к mean, как только они будут доступны. Если в приведенном выше коде результат задачи i еще не доступен, программа ожидает этого результата, хотя возможно, что все результаты задачи с i + 1 по n - 1 уже доступны.

Итак, как мы можем сделать это лучше?

Ответы [ 3 ]

0 голосов
/ 10 сентября 2018

Вы блокируете будущее, что на одну операцию слишком рано.

Почему бы не обновить накопленную сумму в асинхронном потоке, а затем заблокировать все завершенные потоки?

#include <condition_variable>
#include <thread>
#include <mutex>

struct X {};
int f(X);
X make_x(int);

struct algo_state
{
    std::mutex m;
    std::condition_variable cv;
    int remaining_tasks;
    int accumulator;
};

void task(X x, algo_state& state)
{
    auto part = f(x);
    auto lock = std::unique_lock(state.m);
    state.accumulator += part;
    if (--state.remaining_tasks == 0)
    {
        lock.unlock();
        state.cv.notify_one();
    }
}

int main()
{
    int get_n();
    auto n = get_n();
    algo_state state = {
        {},
        {},
        n,
        0
    };

    for(int i = 0 ; i < n ; ++i)
        std::thread([&] { task(make_x(i), state); }).detach();

    auto lock = std::unique_lock(state.m);
    state.cv.wait(lock, [&] { return state.remaining_tasks == 0; });
    auto mean = state.accumulator / n;
    return mean;
}
0 голосов
/ 10 сентября 2018

Не удалось вписать это в комментарий:

Вместо передачи N функций в M потоков для N точек данных (X), вы можете иметь:

  • K очередей из N / K элементов элементов данных для каждого из них
  • M потоков в пуле (производители, готовые с той же функцией)
  • 1 потребительский (сумматор) поток (основной?)

и передают только N точек данных между потоками.Передача функций и их выполнение могут иметь больше служебных данных, чем просто данные.

Кроме того, эти функции могут добавляться в общую переменную без необходимости дополнительного суммирования за пределами, тогда только M производителей могут работать с подходящей синхронизацией, такой как атомные или защитные блокировки..

Каков размер этой структуры?

0 голосов
/ 10 сентября 2018

Самый простой способ

Как насчет возврата лямбды f(x) вместо x:

for (std::size_t i = 0; i < n; ++i)
    results.push_back(std::async([]() -> int { /* ... */ }));

В этом случае f() может быть выполнено как можно скорее и без ожидания. Среднее вычисление все еще должно было бы ждать в последовательном порядке. Но это ложная проблема, поскольку нет ничего быстрее суммирования целых чисел, и в любом случае вы не сможете закончить вычисление среднего значения, прежде чем суммировать каждую часть.

Легкая альтернатива

Еще одним подходом может быть использование atomic<int> mean;, захват его в лямбда-выражении и обновление суммы. Так что, в конце концов, вам нужно только быть уверенным, что все будущее доставлено, прежде чем делать деление. Но, как уже говорилось, учитывая стоимость целочисленного сложения, это может быть излишним.

std::vector<std::future<void>> results;
...
atomic<int> mean{0};
for (std::size_t i = 0; i < n; ++i)
    results.push_back(std::async([&mean]() -> void 
                           { X x = ...; int i=f(x); mean+=i; return; }));    
for (std::size_t i = 0; i < n; ++i)
    results[i].get();
mean = mean/n;   // attention not an atomic operation, but all concurent things are done
...