как реализовать список потоков - PullRequest
1 голос
/ 05 апреля 2020

Я хочу реализовать thread_list , например thread_pool , который имеет интерфейс типа add(), join_all(). Я просто пу sh потоков в него, нет необходимости управлять уничтожением потока.

Я пытался:

  1. thread_pool, но он создает группу потоков при запуске, для производительности. Но производительность не является моей проблемой, я хочу, чтобы мой thread_list не создавал никаких потоков, когда он простаивает, создает поток только по требованию .

  2. boost::thread_group, но он не освобождает потоки, пока сам не освобождается, что вызывает все больше и больше потоков в процессе.

  3. boost::asio::io_context, я могу post задание к нему, но оно однопоточное, задания выполняются по очереди, мне придется создать много потоков для вызова io_context.run(), чтобы сделать его асинхронным, в этом случае это то же самое проблема как thread_pool.

Вот мой код: (Первая версия полностью неверна, только что удалена)

Обновленная версия : ( рекомендуется комментарии, используйте condition_variable и отслеживать все темы):

// thread_list.h

#pragma once

#include <map>
#include <list>
#include <memory>
#include <future>
#include <thread>
#include <mutex>
#include <functional>
#include <condition_variable>

struct thread_list {

    thread_list();
    ~thread_list();


    typedef unsigned long long id_t;
    id_t id = 0;
    std::map<id_t, std::thread> map_thread;
    std::list<id_t> list_done;

    std::mutex mtx;
    std::condition_variable cv;

    bool stopped = false;

    std::thread thrd_cleaner;

    void add(std::function<void()>);
};

thread_list::thread_list() 
    : thrd_cleaner([this] {
        for(;;) {
            std::unique_lock<std::mutex> lock(mtx);

            cv.wait(lock);

            if(stopped) {
                lock.unlock();
                for(auto& pr : map_thread) {
                    pr.second.join();
                }
                return;
            }

            // get first finished thread, remove it
            if(list_done.size() > 0) {
                // remove from list_done
                auto tid = list_done.front();
                list_done.pop_front();

                // remove from map_thread
                auto iter = map_thread.find(tid);
                if(iter != map_thread.end()) {
                    iter->second.join();
                    map_thread.erase(iter);
                }
            }
        }
    })
{}

thread_list::~thread_list() {
    using namespace std;

    {
        lock_guard<mutex> lock(mtx);
        stopped = true;
    }

    cv.notify_one();
    thrd_cleaner.join();
}

void thread_list::add(function<void()> fn) {
    using namespace std;

    lock_guard<mutex> lock(mtx);
    if(stopped) return;

    auto tid = id++;
    map_thread[tid] = thread([fn, tid, this] {
        // 1. call origin function
        fn();

        // 2. mark the thread finished  
        {
            lock_guard<mutex> lock(this->mtx);
            this->list_done.push_back(tid);
        }
        // 3. notify cleaner
        this->cv.notify_one();
    });
}

Тестовый код:

#include <iostream>
using namespace std;

#include "./thread_list.h"

void test_normal_usage() {
    thread_list l;

    for(int i=0; i<100; i++) { // create 100 threads
        l.add([i]{ cout << i << endl; });
    }
}
int main() {
    test_normal_usage();
}

Теперь кажется, работает. Нужна помощь с обзором, какой-нибудь более простой дизайн или потенциальная ошибка?

И condition_variable должен быть запущен в другом потоке, что вызывает как минимум еще 1 поток. В производственном процессе я создам 2000+ thread_list для 2000+ пользовательских сеансов, так что 2000+ потоков теряются. Я знаю, что могу поделиться 1 thread_list всем сеансом пользователя, но это сложно. Может быть, есть лучший дизайн без увеличения количества потоков?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...