Примерно в 50% случаев тест для моего пула потоков не выдает никаких исключений и, кажется, работает как ожидалось. Однако остальные 50% времени он будет выбрасывать либо std::bad_function_call
, либо double free or corruption (!prev)
. Что я делаю не так?
#include <thread>
#include <iostream>
#include <atomic>
#include <any>
#include <stack>
#include <mutex>
#include <algorithm>
class reusable_thread {
std::thread thread;
std::atomic_bool kill = false;
std::stack<std::function<void(void)>> function_stack;
std::stack<std::function<void(void)>> pending_function_stack;
std::mutex stack_mutex;
std::atomic_size_t num_jobs = 0;
/** Seperate containers for functions and pending_function_stack, so that add_job does not have to be locking **/
inline void transfer_functions() {
std::lock_guard lock(stack_mutex);
while(pending_function_stack.size() != 0) {
function_stack.push(pending_function_stack.top());
pending_function_stack.pop();
}
}
public:
/** So the least busy in a container can be found with std::min_element **/
bool operator < (const reusable_thread& other) const { return this->num_jobs < other.num_jobs; }
/** Start the thread: in loop transfer from pending_function_stack to function_stack, run all jobs, spin if no jobs are waiting. **/
reusable_thread() {
thread = std::thread([this](){
while(!kill) {
transfer_functions();
if(function_stack.size() != 0) {
function_stack.top()();
function_stack.pop();
num_jobs--;
}
}
});
}
/** Transfer any last pending functions over, wait for them all to complete, then join the main thread **/
~reusable_thread() {
transfer_functions();
while(function_stack.size() != 0) {}
kill = true;
thread.join();
}
/** Add a job. Non locking**/
void add_job(const std::function<void(void)>& f) {
pending_function_stack.push(f);
num_jobs++;
}
};
template<size_t N>
class thread_pool {
std::array<reusable_thread, N> threads;
public:
void add_job(const std::function<void(void)>& f) {
auto&& least_busy = std::min_element(threads.begin(), threads.end());
least_busy->add_job(f);
}
};
int main() {
thread_pool<6> tp;
for(auto i = 0; i < 1000; i++) {
tp.add_job([](){std::cout << "Hello World" << std::endl; });
}
std::cout << "All jobs added" << std::endl;
}