Почему мой пул потоков C ++ не может ускорить мою программу? - PullRequest
0 голосов
/ 26 апреля 2020

Я пытался реализовать пул потоков c ++ в соответствии с некоторыми замечаниями, сделанными другими, код выглядит так:

#include <vector>
#include <queue>
#include <functional>
#include <future>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <mutex>
#include <memory>
#include <glog/logging.h>
#include <iostream>
#include <chrono>

using std::cout;
using std::endl;

class ThreadPool {
public:
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool(ThreadPool&&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;
    ThreadPool(uint32_t capacity=std::thread::hardware_concurrency(), 
            uint32_t n_threads=std::thread::hardware_concurrency()
            ): capacity(capacity), n_threads(n_threads) {
        init(capacity, n_threads);
    }

    ~ThreadPool() noexcept {
        shutdown();
    }

    void init(uint32_t capacity, uint32_t n_threads) {
        CHECK_GT(capacity, 0) << "task queue capacity should be greater than 0";
        CHECK_GT(n_threads, 0) << "thread pool capacity should be greater than 0";
        for (int i{0}; i < n_threads; ++i) {
            pool.emplace_back(std::thread([this] {
                std::function<void(void)> task;
                while (!this->stop) {
                    {
                        std::unique_lock<std::mutex> lock(this->q_mutex);
                        task_q_empty.wait(lock, [&] {return this->stop | !task_q.empty();});
                        if (this->stop) break;
                        task = this->task_q.front();
                        this->task_q.pop();
                        task_q_full.notify_one();
                    }
                    // auto id = std::this_thread::get_id();
                    // std::cout << "thread id is: " << id << std::endl;
                    task();
                }
            }));
        }
    }

    void shutdown() {
        stop = true;
        task_q_empty.notify_all();
        task_q_full.notify_all();
        for (auto& thread : pool) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }

    template<typename F, typename...Args>
    auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using res_type = decltype(f(args...));
        std::function<res_type(void)> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        auto task_ptr = std::make_shared<std::packaged_task<res_type()>>(func);
        {
            std::unique_lock<std::mutex> lock(q_mutex);
            task_q_full.wait(lock, [&] {return this->stop | task_q.size() <= capacity;});
            CHECK (this->stop == false) << "should not add task to stopped queue\n";
            task_q.emplace([task_ptr]{(*task_ptr)();});
        }
        task_q_empty.notify_one();
        return task_ptr->get_future();
    }

private:
    std::vector<std::thread> pool;
    std::queue<std::function<void(void)>> task_q;
    std::condition_variable task_q_full;
    std::condition_variable task_q_empty;
    std::atomic<bool> stop{false};
    std::mutex q_mutex;
    uint32_t capacity;
    uint32_t n_threads;
};


int add(int a, int b) {return a + b;}

int main() {
    auto t1 = std::chrono::steady_clock::now();
    int n_threads = 1;
    ThreadPool tp;
    tp.init(n_threads, 1024);
    std::vector<std::future<int>> res;
    for (int i{0}; i < 1000000; ++i) {
        res.push_back(tp.submit(add, i, i+1));
    }
    auto t2 = std::chrono::steady_clock::now();
    for (auto &el : res) {
        el.get();
        // cout << el.get() << endl;
    }

    tp.shutdown();
    cout << "processing: "
        << std::chrono::duration<double, std::milli>(t2 - t1).count() 
        << endl;

    return 0;
}

Проблема в том, что, когда я устанавливаю n_threads=1, программа принимает столько же времени, сколько я установил n_threads=4. Поскольку в моем gpu 72 ядра (из команды htop), я считаю, что поток 4 будет быстрее, чем настройки 1 потока. В чем проблема с этой реализацией пула потоков, пожалуйста?

1 Ответ

1 голос
/ 26 апреля 2020

Я обнаружил несколько проблем:

1) Используйте ORing вместо побитовой операции в условных переменных обоих :

Replace this -  `task_q_empty.wait(lock, [&] {return this->stop | !task_q.empty();});`
By - `task_q_empty.wait(lock, [&] {return this->stop || !task_q.empty();});`

2) Используйте notify_all () вместо notify_one () в init () и submit (). 3) Две условные переменные здесь не нужны, используйте только task_q_empty.
4) Ваш вариант использования не идеален. Переключение потоков может перевесить добавление двух целых чисел, может показаться, что чем больше потоков, тем больше время выполнения. Тест в оптимизированном режиме. Попробуйте сценарий, подобный этому, чтобы смоделировать более длинный процесс:

int add(int a, int b) { this_thread::sleep_for(chrono::milliseconds(200)); return a + b; }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...