Как уменьшить boost :: thread_group, имея boost :: asio :: io_service :: run в его потоках? - PullRequest
1 голос
/ 28 августа 2011

Вообще, как я видел, это распространенный способ создания пулов потоков через "io_service + thread_group" . Это действительно отлично подходит для пулов потоков постоянного размера. Или бассейны, которые могут только стать больше. Но мне интересно, как сделать такой пул меньшим без остановки всех io_service?

Итак, у нас как показано

// class variables
asio::io_service io_service;
boost::thread_group threads;
asio::io_service::work *work;

// some pool init function
work = new asio::io_service::work(io_service);
int cores_number = boost::thread::hardware_concurrency(); 
  for (std::size_t i = 0; i < cores_number; ++i)
    threads.create_thread(boost::bind(&asio::io_service::run, &io_service));

// and now we can simply post tasks
io_service.post(boost::bind(&class_name::an_expensive_calculation, this, 42));
io_service.post(boost::bind(&class_name::a_long_running_task, this, 123));

// and it is really eazy to make pool biger - just call (mutexes may be required)
threads.create_thread(boost::bind(&asio::io_service::run, &io_service));

Но что, если мы хотим удалить потоки из нашего пула потоков? мы не можем просто вызвать threads.remove_thread(thread* thrd);, потому что он не перестанет работать в нем &asio::io_service::run (ИМХО), поэтому я задаюсь вопросом - возможно ли и как действительно удалить потоки из такого пула? (не просто прерывая их, но ожидая, пока текущая задача потока не выйдет из области видимости)?

Обновление:

Вот простой скомпилированный код: пул потоков с требуемым сроком службы потоков.

#include <stdio.h>
#include <iostream>
#include <fstream>

//Boost
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/locks.hpp>

boost::asio::io_service io_service;
boost::asio::io_service::work *work;
boost::thread_group threads;
boost::mutex threads_creation;
int time_limit;

int calculate_the_answer_to_life_the_universe_and_everything(int i)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(i));
    std::cout << i << std::endl;
    return i;
}

void run(boost::shared_ptr<boost::thread> thread_ptr)
{
    try
    {
        io_service.run();
    }
    catch(std::exception &e)
    {
        std::cout << "exeption: " << e.what() << std::endl;
        boost::mutex::scoped_lock lock(threads_creation);
        threads.remove_thread(thread_ptr.get());
        lock.unlock();
        std::cout << "thread removed from group" << std::endl;
        return;
    }

}

void pool_item( int i)
{
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i));
    boost::unique_future<int> fi=pt.get_future();

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit)))
    {
        std::cout << "sucsess function returned: " << fi.get() << std::endl;
    }
    else
    {
        std::cout << "request took way 2 long!" << std::endl;

        std::cout << "current group size:" << threads.size() << std::endl;

        boost::shared_ptr<boost::thread> thread;
        boost::packaged_task<void> pt(boost::bind(run, thread));
        thread = boost::shared_ptr<boost::thread>( new boost::thread(std::move(pt)));

        boost::mutex::scoped_lock lock(threads_creation);
        threads.add_thread(thread.get());
        lock.unlock();

        task->join();

        throw std::runtime_error("killed joined thread");

    }
}

int main()
{
    time_limit = 500;

    work = new boost::asio::io_service::work(io_service);
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i)
    {

        boost::shared_ptr<boost::thread> thread;
        boost::packaged_task<void> pt(boost::bind(run, thread));
        thread = boost::shared_ptr<boost::thread>( new boost::thread(std::move(pt)));
        threads.add_thread(thread.get());
    }

    int i = 800;
    io_service.post(boost::bind(pool_item, i));

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2));
    std::cout << "thread should be removed by now." << std::endl
        << "group size:" << threads.size() << std::endl;

    std::cin.get();
    return 0;
}

Как видите, потоки не удаляются из пула потоков даже после вызова .remove_thread(ptr);. = (Почему?

Обновление № 2:

Ну, в любом случае, у меня получилась группа с костюмированными нитками ...

#include <stdio.h>
#include <iostream>
#include <fstream>
#include <set>

//Boost
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/locks.hpp>

//cf service interface
//#include <service.hpp>

//cf-server
//#include <server.h>

#include <boost/foreach.hpp>

class thread_group
{
public:
    void add( boost::shared_ptr<boost::thread> to_add)
    {
        boost::mutex::scoped_lock lock(m);
        ds_.insert(to_add);
    }
    void remove( boost::shared_ptr<boost::thread> to_remove)
    {
        boost::mutex::scoped_lock lock(m);
        ds_.erase(to_remove);
    }

    int size()
    {
        boost::mutex::scoped_lock lock(m);
        return ds_.size();
    }

    void join_all(boost::posix_time::milliseconds interuption_time=boost::posix_time::milliseconds(1000))
    {
        boost::mutex::scoped_lock lock(m);
        BOOST_FOREACH(boost::shared_ptr<boost::thread> t, ds_)
        {
            boost::thread interrupter(boost::bind(&thread_group::interupt_thread, this, t, interuption_time));
        }
    }

private:
    std::set< boost::shared_ptr<boost::thread> > ds_;
    boost::mutex m;
    void interupt_thread(boost::shared_ptr<boost::thread> t, boost::posix_time::milliseconds interuption_time)
    {
        try
        {
            if(!t->timed_join(interuption_time))
                t->interrupt();

        }
        catch(std::exception &e)
        {
        }
    }
};

boost::asio::io_service io_service;
boost::asio::io_service::work *work;
thread_group threads;
int time_limit;



int calculate_the_answer_to_life_the_universe_and_everything(int i)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(i));
    std::cout << i << std::endl;
    return i;
}

void run(boost::shared_ptr<boost::thread> thread_ptr)
{
    try
    {
        io_service.run();
    }
    catch(std::exception &e)
    {
        std::cout << "exeption: " << e.what() << std::endl;
        threads.remove(thread_ptr);
        std::cout << "thread removed from group" << std::endl;
        return;
    }

}

void pool_item( int i)
{
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i));
    boost::unique_future<int> fi=pt.get_future();

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit)))
    {
        std::cout << "sucsess function returned: " << fi.get() << std::endl;
    }
    else
    {
        std::cout << "request took way 2 long!" << std::endl;

        std::cout << "current group size:" << threads.size() << std::endl;
        std::cout << "we want to add thread!" << std::endl;
        boost::shared_ptr<boost::thread> thread;
        boost::packaged_task<void> pt(boost::bind(run, thread));
        threads.add(thread);
        std::cout << "thread added" << std::endl
            << "current group size:" << threads.size() << std::endl;
        task->join();

        throw std::runtime_error("killed joined thread");

    }
}

int main()
{
    time_limit = 500;

    work = new boost::asio::io_service::work(io_service);
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i)
    {

        boost::shared_ptr<boost::thread> thread;
        boost::packaged_task<void> pt(boost::bind(run, thread));
        thread = boost::shared_ptr<boost::thread>( new boost::thread(std::move(pt)));
        threads.add(thread);
    }

    int i = 800;
    io_service.post(boost::bind(pool_item, i));

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2));
    std::cout << "thread should be removed by now." << std::endl
        << "group size:" << threads.size() << std::endl;

    std::cin.get();
    return 0;
}

Ответы [ 2 ]

3 голосов
/ 28 августа 2011

Я смог достичь этого в прошлом, используя тот факт, что run() завершится, если обратный вызов вызовет исключение.Вместо запуска run() непосредственно в потоке, я вызываю служебную функцию, которая выходит из потока, если выбрасывается соответствующее исключение:

void RunIOService()
{
   try
   {
       io_service.run();
   }
   catch(std::exception ex)
   {
   }
}

Затем все, что вам нужно сделать, - это запланировать обратный вызов, который будет выбрасыватьисключение:

static void KillThreadCallback() 
{
    // throw some exception that you catch above
}

io_service.post(&KillThreadCallback);

Это приведет к тому, что поток, который выполняет этот обратный вызов, завершит свою работу, существенно уменьшив размер счетчика пула потоков на 1. Используя это, вы можете довольно легко расширить и свернуть пул потоков io_service.

1 голос
/ 29 августа 2011

Один шаблон, который можно использовать для чистого отключения службы ввода / вывода, (с использованием лямбда-выражения C ++ 0x):

void ThreadLoop()
{
    while(m_keepRunning) {
        try {
            io_service.run_one();
        } catch(const std::exception& e) {
            // error handling
        }
    }
}

void Stop()
{
   // Using C++0x lambdas
   io_service.post([=]{ m_keepRunning = false; });
   // or
   io_service.post(boost::bind(&ThisClass::StopCallback, this));
}

void StopCallback()
{
   m_keepRunning = false;
}

Где m_keepRunning - переменная-член.Следует трогать только в служебном потоке ввода / вывода.

...