Как синхронизировать поток для рекурсивной функции с подпотоками - PullRequest
0 голосов
/ 01 февраля 2019

Я довольно новичок в C ++ и многопоточности, и я застрял в этой проблеме на несколько дней ... Он должен сформировать базовый код для FFT (быстрое преобразование Фурье) - просто базовый код, поэтому некоторые вещи по-прежнему отсутствуюттакие как тривиальные члены и входные данные являются двойными числами (еще не комплексными числами).

Я хочу выполнить некоторое параллельное программирование функции f_thread с C ++ ... вот это работает 'компилируемый 'код

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>

void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}

void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}

void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}

//the f_thread function

void f_thread(std::vector<double> in, std::vector<double> &out) {

    if (in.size() == 1) {out = in;}
    else {

        std::vector<double> f0(in.size()/2);
        std::vector<double> f1(in.size()/2);

        get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
        get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in

        std::vector<double> a(f0.size());
        std::vector<double> b(f1.size());

        std::mutex mtx1; std::mutex mtx2;

        std::thread t0(f_thread,std::ref(f0),std::ref(a)); //create thread for f_thread on a
        std::thread t1(f_thread,std::ref(f1),std::ref(b)); //create thread for f_thread on b

        t0.join(); t1.join(); // join 2 threads

        std::vector<double> a_out(f0.size());
        std::vector<double> b_out(f1.size());

        add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
        sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a

        std::vector<double> f_out(in.size());
        attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
        out = f_out; 
    }
}

int main() {
    int n_elements = 16;
    std::vector<double> sample_input(n_elements);
    for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
    std::vector<double> output(n_elements);
    std::thread start(f_thread,std::ref(sample_input),std::ref(output));
    start.join();
    for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "\n";}
    }

Итак, f_thread инициализируется как поток, а затем создает 2 подпотока, которые рекурсивно вызывают f_thread .Я испробовал несколько трюков с использованием мьютексов, но ни один из них, похоже, не работает, поскольку синхронизация между двумя подпотоками идет не очень хорошо (это горячая точка для условий гонки).Вот один код, который я пробовал и который не работал.Я также пытался использовать глобальные рекурсивные мьютексы, но все еще без улучшений.

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>

void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}

void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}

void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}

//the f_thread function

void f_thread(std::vector<double> in, std::vector<double> &out) {

    if (in.size() == 1) {out = in;}
    else {

        std::vector<double> f0(in.size()/2);
        std::vector<double> f1(in.size()/2);

        get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
        get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in

        std::vector<double> a(f0.size());
        std::vector<double> b(f1.size());

        std::mutex mtx1; std::mutex mtx2;

        mtx1.lock(); std::thread t0(f_thread,std::ref(f0),std::ref(a)); mtx1.unlock(); //create thread for f_thread on a
        mtx2.lock(); std::thread t1(f_thread,std::ref(f1),std::ref(b)); mtx2.unlock(); //create thread for f_thread on b

        t0.join(); t1.join(); // join 2 threads

        std::vector<double> a_out(f0.size());
        std::vector<double> b_out(f1.size());

        add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
        sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a

        std::vector<double> f_out(in.size());
        attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
        out = f_out; 
    }
}

int main() {
    int n_elements = 16;
    std::vector<double> sample_input(n_elements);
    for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
    std::vector<double> output(n_elements);
    std::thread start(f_thread,std::ref(sample_input),std::ref(output));
    start.join();
    for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "\n";}
    }

Мне удалось проверить, что этот код компилируется с использованием g ++ f_thread.cpp -pthread со стандартными библиотеками C ++ в linux (ubuntu 18.04) OS

Код теперь выполняется (больше нет «прерванных ошибок из-за сброса ядра»), но вывод для многопоточной версии меняется при каждом запуске (что указывает на то, что синхронизация работает плохо).

Для справки здесьявляется последовательной версией кода, которая не использует подпотоки и которая работает хорошо (т.е. нет изменений в выходных данных при каждом его запуске)

// WORKING sequential version

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>

void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}

void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}

void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}

//the f_thread function

void f_thread(std::vector<double> in, std::vector<double> &out) {

    if (in.size() == 1) {out = in;}
    else {

        std::vector<double> f0(in.size()/2);
        std::vector<double> f1(in.size()/2);

        get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
        get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in

        std::vector<double> a(f0.size());
        std::vector<double> b(f1.size());

        f_thread(std::ref(f0),std::ref(a)); // no thread, just call recursion 

        f_thread(std::ref(f1),std::ref(b)); // no thread, just call recursion 

        std::vector<double> a_out(f0.size());
        std::vector<double> b_out(f1.size());

        add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
        sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a

        std::vector<double> f_out(in.size());
        attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
        out = f_out; 
    }
}

int main() {
    int n_elements = 16;
    std::vector<double> sample_input(n_elements);
    for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
    std::vector<double> output(n_elements);
    std::thread start(f_thread,std::ref(sample_input),std::ref(output));
    start.join();
    for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "\n";}
    }

Предполагается, что результаты будут зафиксированы в этом выводекаждый раз, когда код запускается.

output element 0: 120
output element 1: 0
output element 2: 0
output element 3: 7.31217e-322
output element 4: 0
output element 5: 6.46188e-319
output element 6: 56
output element 7: 0
output element 8: 0
output element 9: 4.19956e-322
output element 10: 120
output element 11: 0
output element 12: 0
output element 13: 7.31217e-322
output element 14: 0
output element 15: 6.46188e-319

Ответы [ 2 ]

0 голосов
/ 05 февраля 2019

Вы должны справиться с этим, спросив, сколько процессоров существует, затем разделить вашу работу и использовать очередь, чтобы объединить ее вместе.

Я не знаю алгоритм FFT, но, глядя на вашКурсивно кодируйте, похоже, что вы в основном разбиваете свои данные, используя более тонкую зубчатую гребень.За исключением того, что вы начинаете с самого лучшего уровня и продвигаетесь вверх, что не очень хороший способ разделить вещи.

Вы не хотите, чтобы другой ЦП обрабатывал все остальные значения, потому что даже начип многоядерного процессора, есть несколько кешей L1.Каждый кэш L1 используется не более чем одним другим ядром.Таким образом, вы хотите, чтобы все значения, с которыми работает один ЦП, были близки друг к другу, чтобы максимизировать вероятность того, что искомое значение находится в кеше.

Таким образом, вы должны начать разделение с наибольшим непрерывнымломти.Поскольку алгоритм FFT работает на основе степеней двух, вы должны подсчитать количество ядер, которые у вас есть.Используйте thread::hardware_concurrency() для подсчета.Затем округлите до следующей наивысшей степени двух и разбейте вашу проблему на это число суб-БПФ.Затем объединил их результаты в главном потоке.

У меня есть программа, которую я написал, которая делает то, что вы хотите.Он разбивает список на несколько частей, чтобы выполнить сортировку на .Тогда у него есть очередь слияний, которые нужно сделать.Каждый чанк обрабатывается отдельным потоком, и каждое слияние также порождается в свой собственный поток.

Я делю количество ядер на два из-за особенностей современных процессоров, которые я не люблю называтьгиперпоточность.Хотя я мог бы просто проигнорировать это, и он бы работал нормально, хотя, поскольку основной спор был бы из-за целого числа ALU, он мог бы быть немного медленнее.(Гиперпоточность разделяет ресурсы в пределах одного ядра.)

Из другого ответа кажется, что в вашем коде FFT есть несколько ошибок.Я бы порекомендовал заставить его работать только с одним потоком, а затем выяснить, как его разделить.

0 голосов
/ 04 февраля 2019

Это не ошибка многопоточности, а доступ за пределы элемента массива в функции attach:

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

Во втором цикле индекс начинается с a.size(), а не с 0 -но вы используете его для доступа к элементам b, как если бы он начинался с 0.

Вместо записи циклов вы можете использовать std::copy из <algorithm>:

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    std::copy(a.begin(), a.end(), out.begin());
    std::copy(b.begin(), b.end(), out.begin()+a.size());
}

Послечто для рекурсивных потоков вам понадобится только следующее:

std::thread t0(f_thread,std::ref(f0),std::ref(a)); //create thread for f_thread on a
std::thread t1(f_thread,std::ref(f1),std::ref(b)); //create thread for f_thread on b
t0.join(); t1.join(); // join 2 threads

Нет гонок, поскольку каждый поток работает с отдельными входными и выходными массивами (которые вы создали в стеке «родительского» потока).Результат является детерминированным и одинаковым для последовательных и многопоточных версий:

output element 0: 120
output element 1: 64
output element 2: 32
output element 3: 0
output element 4: 16
output element 5: 0
output element 6: 0
output element 7: 0
output element 8: 8
output element 9: 0
output element 10: 0
output element 11: 0
output element 12: 0
output element 13: 0
output element 14: 0
output element 15: 0

Кстати, вы могли догадаться, что даже ваша серийная версия неверна, поскольку все входные данные представляют собой целые числа, и вы только копируете, добавляете ивычесть те;поэтому нет причин для появления чисел с плавающей точкой, таких как 7.31217e-322, в выводе.

Также обратите внимание на комментарии Дэвиса Херринга: вы копируете данные между векторами.По крайней мере, я бы передавал векторы в функции по константным ссылкам, а не по значениям (за исключением случаев, когда известно, что эти копии удалены).

Наконец, вы должны прекратить создавать новые потоки гораздо раньше, чем когдаваши входные массивы имеют размер 1. Для реальных проблемных размеров вы не сможете создавать тысячи потоков;и даже если вы преуспеете в этом, накладные расходы на создание и запуск такого количества потоков приведут к тому, что ваш код будет работать очень очень медленно.В идеале, вы не должны создавать больше потоков, чем HW-ядер на компьютере, на котором выполняется код.

...