Как заставить boost :: thread_group выполнять фиксированное количество параллельных потоков - PullRequest
13 голосов
/ 27 июля 2010

Это код для создания группы потоков и параллельного выполнения всех потоков:

boost::thread_group group;
for (int i = 0; i < 15; ++i)
    group.create_thread(aFunctionToExecute);
group.join_all();

Этот код будет выполнять все потоки одновременно.То, что я хочу сделать, это выполнить их все, кроме 4 максимум параллельно.Когда on завершается, выполняется другой, пока не останется больше выполнить.

Ответы [ 4 ]

3 голосов
/ 27 июля 2010

Другим, более эффективным решением было бы иметь обратный вызов каждого потока к первичному потоку, когда они закончили, и обработчик на первичном потоке мог запускать новый поток каждый раз. Это предотвращает повторяющиеся вызовы timed_join, так как основной поток не будет ничего делать, пока не сработает обратный вызов.

0 голосов
/ 25 октября 2017

Я создал свой собственный упрощенный интерфейс boost::thread_group для выполнения этой работы:

class ThreadGroup : public boost::noncopyable
{
    private:
        boost::thread_group        group;
        std::size_t                maxSize;
        float                      sleepStart;
        float                      sleepCoef;
        float                      sleepMax;
        std::set<boost::thread*>   running;

    public:
        ThreadGroup(std::size_t max_size = 0,
                    float max_sleeping_time = 1.0f,
                    float sleeping_time_coef = 1.5f,
                    float sleeping_time_start = 0.001f) :
            boost::noncopyable(),
            group(),
            maxSize(max_size),
            sleepStart(sleeping_time_start),
            sleepCoef(sleeping_time_coef),
            sleepMax(max_sleeping_time),
            running()
        {
            if(max_size == 0)
                this->maxSize = (std::size_t)std::max(boost::thread::hardware_concurrency(), 1u);
            assert(max_sleeping_time >= sleeping_time_start);
            assert(sleeping_time_start > 0.0f);
            assert(sleeping_time_coef > 1.0f);
        }

        ~ThreadGroup()
        {
            this->joinAll();
        }

        template<typename F> boost::thread* createThread(F f)
        {
            float sleeping_time = this->sleepStart;
            while(this->running.size() >= this->maxSize)
            {
                for(std::set<boost::thread*>::iterator it = running.begin(); it != running.end();)
                {
                    const std::set<boost::thread*>::iterator jt = it++;
                    if((*jt)->timed_join(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time))))
                        running.erase(jt);
                }
                if(sleeping_time < this->sleepMax)
                {
                    sleeping_time *= this->sleepCoef;
                    if(sleeping_time > this->sleepMax)
                        sleeping_time = this->sleepMax;
                }
            }
            return *this->running.insert(this->group.create_thread(f)).first;
        }

        void joinAll()
        {
            this->group.join_all();
        }

        void interruptAll()
        {
#ifdef BOOST_THREAD_PROVIDES_INTERRUPTIONS
            this->group.interrupt_all();
#endif
        }

        std::size_t size() const
        {
            return this->group.size();
        }
    };

Вот пример использования, очень похожий на boost::thread_group с основным отличием, что создание потокаэто точка ожидания:

{
  ThreadGroup group(4);
  for(int i = 0; i < 15; ++i)
    group.createThread(aFunctionToExecute);
} // join all at destruction
0 голосов
/ 11 января 2016

Я думаю, что вы ищете thread_pool реализацию, которая доступна здесь .

Кроме того, я заметил, что если вы создаете вектор std :: future и храните в нем фьючерсы многих std :: async_tasks, и у вас нет блокирующего кода в функции, передаваемой потоку, VS2013 (по крайней мере Я могу подтвердить), запустит точно соответствующее количество потоков, которые ваша машина может обработать. Он повторно использует созданные потоки.

0 голосов
/ 27 июля 2010

У меня есть что-то вроде этого:

    boost::mutex mutex_;
    boost::condition_variable condition_;
    const size_t throttle_;
    size_t size_;
    bool wait_;
    template <typename Env, class F>
    void eval_(const Env &env, const F &f) {
        {   
            boost::unique_lock<boost::mutex> lock(mutex_);
            size_ = std::min(size_+1, throttle_);
            while (throttle_ <= size_) condition_.wait(lock);
        }
        f.eval(env);
        {
            boost::lock_guard<boost::mutex> lock(mutex_);
            --size_; 
        }
        condition_.notify_one();
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...