Я хочу реализовать thread_list , например thread_pool , который имеет интерфейс типа add()
, join_all()
. Я просто пу sh потоков в него, нет необходимости управлять уничтожением потока.
Я пытался:
thread_pool
, но он создает группу потоков при запуске, для производительности. Но производительность не является моей проблемой, я хочу, чтобы мой thread_list не создавал никаких потоков, когда он простаивает, создает поток только по требованию .
boost::thread_group
, но он не освобождает потоки, пока сам не освобождается, что вызывает все больше и больше потоков в процессе.
boost::asio::io_context
, я могу post
задание к нему, но оно однопоточное, задания выполняются по очереди, мне придется создать много потоков для вызова io_context.run()
, чтобы сделать его асинхронным, в этом случае это то же самое проблема как thread_pool
.
Вот мой код: (Первая версия полностью неверна, только что удалена)
Обновленная версия : ( рекомендуется комментарии, используйте condition_variable и отслеживать все темы):
// thread_list.h
#pragma once
#include <map>
#include <list>
#include <memory>
#include <future>
#include <thread>
#include <mutex>
#include <functional>
#include <condition_variable>
struct thread_list {
thread_list();
~thread_list();
typedef unsigned long long id_t;
id_t id = 0;
std::map<id_t, std::thread> map_thread;
std::list<id_t> list_done;
std::mutex mtx;
std::condition_variable cv;
bool stopped = false;
std::thread thrd_cleaner;
void add(std::function<void()>);
};
thread_list::thread_list()
: thrd_cleaner([this] {
for(;;) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock);
if(stopped) {
lock.unlock();
for(auto& pr : map_thread) {
pr.second.join();
}
return;
}
// get first finished thread, remove it
if(list_done.size() > 0) {
// remove from list_done
auto tid = list_done.front();
list_done.pop_front();
// remove from map_thread
auto iter = map_thread.find(tid);
if(iter != map_thread.end()) {
iter->second.join();
map_thread.erase(iter);
}
}
}
})
{}
thread_list::~thread_list() {
using namespace std;
{
lock_guard<mutex> lock(mtx);
stopped = true;
}
cv.notify_one();
thrd_cleaner.join();
}
void thread_list::add(function<void()> fn) {
using namespace std;
lock_guard<mutex> lock(mtx);
if(stopped) return;
auto tid = id++;
map_thread[tid] = thread([fn, tid, this] {
// 1. call origin function
fn();
// 2. mark the thread finished
{
lock_guard<mutex> lock(this->mtx);
this->list_done.push_back(tid);
}
// 3. notify cleaner
this->cv.notify_one();
});
}
Тестовый код:
#include <iostream>
using namespace std;
#include "./thread_list.h"
void test_normal_usage() {
thread_list l;
for(int i=0; i<100; i++) { // create 100 threads
l.add([i]{ cout << i << endl; });
}
}
int main() {
test_normal_usage();
}
Теперь кажется, работает. Нужна помощь с обзором, какой-нибудь более простой дизайн или потенциальная ошибка?
И condition_variable
должен быть запущен в другом потоке, что вызывает как минимум еще 1 поток. В производственном процессе я создам 2000+ thread_list для 2000+ пользовательских сеансов, так что 2000+ потоков теряются. Я знаю, что могу поделиться 1 thread_list
всем сеансом пользователя, но это сложно. Может быть, есть лучший дизайн без увеличения количества потоков?