Я пишу сервер UDP, который в настоящее время получает данные от UDP, оборачивает их в объект и помещает их в параллельную очередь. Параллельная очередь - это реализация, представленная здесь: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
Пул рабочих потоков извлекает данные из очереди для обработки.
Очередь определяется глобально как:
static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;
Теперь проблема, с которой я столкнулся, заключается в том, что если я просто напишу функцию для создания данных и вставлю ее в очередь, а также создаю несколько потоков потребителей, чтобы вытянуть их, это будет работать нормально.
Но как только я добавляю своего производителя на основе UDP, рабочие потоки перестают получать уведомления о поступлении данных в очередь.
Я проследил проблему до конца функции push в concurrent_queue.
В частности, строка: the_condition_variable.notify_one ();
Не возвращается при использовании кода моей сети.
Так что проблема связана с тем, как я написал сетевой код.
Вот как это выглядит.
enum
{
MAX_LENGTH = 1500
};
class Msg
{
public:
Msg()
{
static int i = 0;
i_ = i++;
printf("Construct ObbsMsg: %d\n", i_);
}
~Msg()
{
printf("Destruct ObbsMsg: %d\n", i_);
}
const char* toString() { return data_; }
private:
friend class server;
udp::endpoint sender_endpoint_;
char data_[MAX_LENGTH];
int i_;
};
class server
{
public:
server::server(boost::asio::io_service& io_service)
: io_service_(io_service),
socket_(io_service, udp::endpoint(udp::v4(), PORT))
{
waitForNextMessage();
}
void server::waitForNextMessage()
{
printf("Waiting for next msg\n");
next_msg_.reset(new Msg());
socket_.async_receive_from(
boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
boost::bind(&server::handleReceiveFrom, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
{
if (!error && bytes_recvd > 0) {
printf("got data: %s. Adding to work queue\n", next_msg_->toString());
g_work_queue.push(next_msg_); // Add received msg to work queue
waitForNextMessage();
} else {
waitForNextMessage();
}
}
private:
boost::asio::io_service& io_service_;
udp::socket socket_;
udp::endpoint sender_endpoint_;
boost::shared_ptr<Msg> next_msg_;
}
int main(int argc, char* argv[])
{
try{
boost::asio::io_service io_service;
server s(io_service);
io_service.run();
catch(std::exception& e){
std::err << "Exception: " << e.what() << std::endl;
}
return 0;
}
Теперь я обнаружил, что если handle_receive_from может вернуться, то notify_one () в concurrent_queue возвращает. Так что я думаю, это потому, что у меня есть рекурсивный цикл.
Итак, как правильно начать прослушивание новых данных? и является ли асинхронный пример сервера udp некорректным, поскольку я основал его на том, что они уже делали.
РЕДАКТИРОВАТЬ: Хорошо, проблема стала еще более странной.
То, что я не упомянул здесь, - это то, что у меня есть класс под названием процессор
Процессор выглядит так:
class processor
{
public:
processor::processor(int thread_pool_size) :
thread_pool_size_(thread_pool_size) { }
void start()
{
boost::thread_group threads;
for (std::size_t i = 0; i < thread_pool_size_; ++i){
threads.create_thread(boost::bind(&ObbsServer::worker, this));
}
}
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
private:
int thread_pool_size_;
};
Теперь кажется, что если я извлекаю рабочую функцию самостоятельно и запускаю потоки
от основного. оно работает! Может кто-нибудь объяснить, почему поток функционирует так, как я ожидал бы вне класса, но внутри у него есть побочные эффекты?
РЕДАКТИРОВАТЬ 2: Теперь он становится еще страннее
Я вытащил две функции (точно так же).
Один называется потребителем, другой работник.
т.е.
void worker()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
void consumer()
{
while (true){
boost::shared_ptr<ObbsMsg> msg;
printf("waiting for msg\n");
g_work_queue.wait_and_pop(msg);
printf("Got msg: %s\n", msg->toString());
}
}
Теперь потребитель находится вверху файла server.cpp. То есть где и код нашего сервера.
С другой стороны, работник живет в файле processor.cpp.
Сейчас я сейчас вообще не использую процессор. Основная функция теперь выглядит так:
void consumer();
void worker();
int main(int argc, char* argv[])
{
try {
boost::asio::io_service io_service;
server net(io_service);
//processor s(7);
boost::thread_group threads;
for (std::size_t i = 0; i < 7; ++i){
threads.create_thread(worker); // this doesn't work
// threads.create_thread(consumer); // THIS WORKS!?!?!?
}
// s.start();
printf("Server Started...\n");
boost::asio::io_service::work work(io_service);
io_service.run();
printf("exiting...\n");
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Почему потребитель может получать поставленные в очередь предметы, а работник - нет.
Это идентичные реализации с разными именами.
Это не имеет никакого смысла. Есть идеи?
Вот пример вывода при получении текста «Hello World»:
Выход 1: не работает. При вызове рабочей функции или использовании класса процессора.
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Выход 2: работает при вызове функции-потребителя, которая идентична рабочей функции.
Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Got msg: hello world <----- this is what I've been wanting to see!
Destruct ObbsMsg: 0
waiting for msg