почему вектор многопоточной параллельной функции суммы ограничен в зависимости от количества потоков? - PullRequest
0 голосов
/ 28 января 2020

Я написал параллельную функцию с рекурсивной суммой, которая получает вектор чисел, пул потоков и размер вектора, и она должна возвращать сумму вектора, но когда я хочу использовать, как в примере ниже, 20 ячеек размерный вектор, я должен использовать как минимум 8 потоков, иначе программа застрянет и не завершится (и не выдаст ошибку).

это код пула потоков, который я использую:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

это моя параллельная функция суммы:

int Sum_Parallelled(int *begin, ThreadPool *threadPool,int size) {
    if (size == 1) {
        return *begin;
    } else {
        auto res = threadPool->enqueue(Sum_Parallelled, (begin), threadPool, size / 2);
        if (size % 2 == 0) {
            return Sum_Parallelled(begin + (size / 2), threadPool, size / 2) + res.get();
        } else {
            return Sum_Parallelled(begin + (size / 2), threadPool, size / 2 + 1) + res.get();
        }
    }
}

и это код основной функции:

int main() {
    std::vector<int> vec;
    for(int i = 0; i < 20; i++){ // creating a vector with x cells.
        vec.push_back(i);
    }
    ThreadPool threadPool(8); // creating a threadpool with y threads.
    int size = vec.size();
    int sum = threadPool.enqueue(Sum_Parallelled,vec.data(),&threadPool,size).get();
    std::cout << "The sum in the parallel sum: " << sum << std::endl;
    return 0;
}

Ответы [ 2 ]

0 голосов
/ 28 января 2020

Ваш Sum_Parallelled ставит в очередь новую подзадачу для первой половины его диапазона и блокирует до тех пор, пока эта задача не будет завершена .

Таким образом, если число нерешенных задач может когда-либо превышение количества потоков приведет к взаимоблокировке.

Мы можем легко записать количество задач, которые будут созданы для данного ввода:

sp(20) -> sp(10) + sp(10)
sp(10) -> (sp(5) + sp(5))
sp(5)  -> (sp(2) + sp(3))

et c.

Таким образом, вызов вашей функции с размером = 20 будет:

  1. создать две задачи с размером = 10 и заблокировать текущий поток, пока они оба не вернут .

    Состояние: один поток заблокирован в ожидании, с двумя задачами в очереди.

  2. каждая задача размером 10 создаст две задачи размера = 5 и будет блокирована до они оба возвращают .

    Состояние: три потока заблокированы в ожидании, с четырьмя задачами в очереди.

  3. каждая задача размера = 5 создаст один размер = 2 задача и один размер = 3 задача и блок до тех пор, пока оба не вернутся .

    Состояние: семь потоков заблокированы в ожидании, остроумие h восемь задач в очереди.

, поэтому ясно, что на следующем уровне будут все доступные потоки, ожидающие задачи, которые никогда не смогут быть выполнены.

Это также явно неэффективно : даже если у нас достаточно потоков, они в основном просто блокируются, ожидая других потоков вместо того, чтобы делать что-нибудь полезное.

К счастью, легко придумать лучшие способы организации этого. Например:

  1. Просто создайте одну задачу для каждого потока, суммируя 1 / n нитей вектора, а затем суммируйте результаты, когда все они выполнены.

    Недостатки : он сдвигает (некоторые из) лог c на уровень, и если входной вектор очень большой, однопоточное окончательное накопление может стать узким местом

    . вектор действительно очень большой, эта простая схема будет работать намного лучше.

  2. Создайте две подзадачи на каждом уровне, как вы делаете сейчас, но вместо ожидания их результатов просто вернитесь пара фьючерсов.

0 голосов
/ 28 января 2020

Чтобы суммировать свои двадцать чисел, вы сначала запускаете задание (в потоке), которое рекурсивно суммирует двадцать чисел. Это задание запускает две задачи, которые составляют половину из двадцати чисел каждая. Затем он сидит там, ожидая двух заданий до конца sh и, наконец, возвращает результат. Хотя он все время блокирует один из потоков в пуле. Если вы запишете, когда каждый поток выбирает задачу и завершает ее, вы увидите, что с двадцатью номерами вам нужно больше параллельных задач, чем потоков, следовательно, зависающая программа.

Ваша проблема была вызвана использование рекурсии, которая является неправильным инструментом для работы. Рассмотрим дерево, которое сопоставляет каждую задачу с родительской задачей. На листьях у вас есть задачи, которые получают диапазон только из одного числа, в root у вас есть задача с двадцатью номерами. Теперь каждое задание, которое имеет детей, блокируется, пока оба ребенка не будут завершены. При фиксированном размере пула потоков вы можете обрабатывать только определенный размер дерева, потому что размер дерева определяет расстояние до листа root и, следовательно, количество заблокированных потоков, ожидающих дочерние задачи. Если высота слишком велика, потоки блокируются в ожидании дочерних задач, которые никогда не выполняются, поскольку свободных потоков нет.

...