Почему мой пул потоков иногда генерирует `std :: bad_function_call` или` double free или поврежден (! Prev) ` - PullRequest
0 голосов
/ 30 апреля 2020

Примерно в 50% случаев тест для моего пула потоков не выдает никаких исключений и, кажется, работает как ожидалось. Однако остальные 50% времени он будет выбрасывать либо std::bad_function_call, либо double free or corruption (!prev). Что я делаю не так?

#include <thread>
#include <iostream>
#include <atomic>
#include <any>
#include <stack>
#include <mutex>
#include <algorithm>

class reusable_thread {
    std::thread thread;
    std::atomic_bool kill = false;
    std::stack<std::function<void(void)>> function_stack;
    std::stack<std::function<void(void)>> pending_function_stack;
    std::mutex stack_mutex;
    std::atomic_size_t num_jobs = 0;

    /** Seperate containers for functions and pending_function_stack, so that add_job does not have to be locking **/
    inline void transfer_functions() {
        std::lock_guard lock(stack_mutex);
        while(pending_function_stack.size() != 0) {
            function_stack.push(pending_function_stack.top());
            pending_function_stack.pop();
        }
    }

public:

    /** So the least busy in a container can be found with std::min_element **/
    bool operator < (const reusable_thread& other) const { return this->num_jobs < other.num_jobs; }

    /** Start the thread: in loop transfer from pending_function_stack to function_stack, run all jobs, spin if no jobs are waiting. **/
    reusable_thread() {
        thread = std::thread([this](){
            while(!kill) {
                transfer_functions();
                if(function_stack.size() != 0) {
                    function_stack.top()();
                    function_stack.pop();
                    num_jobs--;
                }
            }
        });
    }

    /** Transfer any last pending functions over, wait for them all to complete, then join the main thread **/
    ~reusable_thread() {
        transfer_functions();
        while(function_stack.size() != 0) {}
        kill = true;
        thread.join();
    }

    /** Add a job. Non locking**/
    void add_job(const std::function<void(void)>& f) {
        pending_function_stack.push(f);
        num_jobs++;
    }

};

template<size_t N>
class thread_pool {
    std::array<reusable_thread, N> threads;
public:
    void add_job(const std::function<void(void)>& f) {
        auto&& least_busy = std::min_element(threads.begin(), threads.end());
        least_busy->add_job(f);
    }
};

int main() {
    thread_pool<6> tp;
    for(auto i = 0; i < 1000; i++) {
        tp.add_job([](){std::cout << "Hello World" << std::endl; });
    }
    std::cout << "All jobs added" << std::endl;
}

Ответы [ 2 ]

5 голосов
/ 30 апреля 2020

Ваш деструктор reusable_thread вызывает transfer_functions, который помещает функции в ваш function_stack, удерживая мьютекс, но основной l oop в вашем потоке потребляет функции из вашего function_stack без получения мьютекса , Поэтому у вас есть гонка данных на function_stack.

4 голосов
/ 30 апреля 2020

У вас есть состояние гонки между add_job и transfer_functions, так как один добавляет к pending_function_stack без блокировки, а другой проверяет его

...