boost asio нужно публиковать n рабочих мест только после того, как m рабочих мест закончено - PullRequest
1 голос
/ 17 октября 2011

Я ищу способ дождаться завершения ряда заданий, а затем выполнить еще одно совершенно другое число заданий.С нитками, конечно.Краткое объяснение: я создал два рабочих потока, оба из которых выполняются на io_service.Код ниже взят из здесь .

Ради простоты я создал два типа заданий: CalculateFib i CalculateFib2 .Я хочу, чтобы задания CalculateFib2 до начинались после и только после завершения CalculateFib .Я пытался использовать условную переменную, как объяснено здесь , но программа зависает, если заданий CalculateFib2 больше одного.Что я делаю не так?

спасибо, додол

#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;
boost::mutex mx;
boost::condition_variable cv;

void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service)
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    io_service->run();

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}

size_t fib( size_t n )
{
    if ( n <= 1 )
    {
        return n;
    }
    boost::this_thread::sleep( 
        boost::posix_time::milliseconds( 1000 )
        );
    return fib( n - 1 ) + fib( n - 2);
}

void CalculateFib( size_t n )
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();

    boost::lock_guard<boost::mutex> lk(mx);
    cv.notify_all();
}

void CalculateFib2( size_t n )
{
    boost::unique_lock<boost::mutex> lk(mx);
    cv.wait(lk);

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib2( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib2( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();
}
int main( int argc, char * argv[] )
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
        );
    boost::shared_ptr< boost::asio::io_service::work > work(
        new boost::asio::io_service::work( *io_service )
        );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] The program will exit when all work has finished."
        << std::endl;
    global_stream_lock.unlock();

    boost::thread_group worker_threads;
    for( int x = 0; x < 2; ++x )
    {
        worker_threads.create_thread( 
            boost::bind( &WorkerThread, io_service )
            );
    }
    io_service->post( boost::bind( CalculateFib, 5 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 3 ) );

    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    work.reset();
    worker_threads.join_all();

    return 0;
}

1 Ответ

2 голосов
/ 17 октября 2011

Внутри CalculateFib2 первое, что вы делаете - ждете условия (cv). Это условие только получает сигнал в конце CalculateFib. Таким образом, само собой разумеется, что выполнение никогда не продолжается, если только условие не запущено (путем публикации CalculateFib) задания.

Действительно, добавив любую другую строку, например:

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );

io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );

io_service->post( boost::bind( CalculateFib, 5 ) );   // <-- ADDED

делает выполнение выполненным до конца.

В попытке пролить больше света: если вы изолируете партию Fib2 (вовремя), как

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );

boost::this_thread::sleep(boost::posix_time::seconds( 10 ));
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );

все задания Fib2 всегда будут блокироваться, независимо от количества потоков, потому что все задания Fib были завершены до их публикации. Простой

io_service->post( boost::bind( CalculateFib, 1 ) );

разблокирует все официанты (т. Е. Только столько, сколько есть ожидающих потоков, то есть число доступных потоков -1, потому что задания Fib () также занимают поток. Теперь с <7 потоками это может зайти в тупик, потому что нет потока, доступного даже для запуска задания Fib () на </strong> (все потоки заблокированы в ожидании в Fib2)


Если честно, я не понимаю, чего вы пытаетесь достичь с точки зрения планирования. Я подозреваю, что вы должны отслеживать очереди заданий и явно публиковать задания («задачи») только тогда, когда вы достигли необходимого количества элементов. Таким образом, вы сможете ПОЦЕЛУЙ и получите очень гибкий интерфейс для планирования работы.

В общем случае с группой потоков (пулом) вы хотите избежать блокировки потоков на неопределенное время. Это может привести к блокировке вашего рабочего графика и плохой работе в противном случае.

...