Побочные эффекты глобальных статических переменных - PullRequest
0 голосов
/ 05 августа 2010

Я пишу сервер UDP, который в настоящее время получает данные от UDP, оборачивает их в объект и помещает их в параллельную очередь. Параллельная очередь - это реализация, представленная здесь: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

Пул рабочих потоков извлекает данные из очереди для обработки.

Очередь определяется глобально как:

static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;

Теперь проблема, с которой я столкнулся, заключается в том, что если я просто напишу функцию для создания данных и вставлю ее в очередь, а также создаю несколько потоков потребителей, чтобы вытянуть их, это будет работать нормально. Но как только я добавляю своего производителя на основе UDP, рабочие потоки перестают получать уведомления о поступлении данных в очередь.

Я проследил проблему до конца функции push в concurrent_queue. В частности, строка: the_condition_variable.notify_one (); Не возвращается при использовании кода моей сети.

Так что проблема связана с тем, как я написал сетевой код.

Вот как это выглядит.

enum
{
    MAX_LENGTH = 1500
};


class Msg
{
  public:
    Msg()
    {
       static int i = 0;
       i_ = i++;
       printf("Construct ObbsMsg: %d\n", i_);
    }

    ~Msg()
    {
       printf("Destruct ObbsMsg: %d\n", i_);
    }

    const char* toString() { return data_; }

  private:
    friend class server;

    udp::endpoint sender_endpoint_;
    char data_[MAX_LENGTH];
    int i_;
};

class server
{
public:
  server::server(boost::asio::io_service& io_service)
    : io_service_(io_service),
      socket_(io_service, udp::endpoint(udp::v4(), PORT))
  {
    waitForNextMessage();
  }  

  void server::waitForNextMessage()
  {
    printf("Waiting for next msg\n");

    next_msg_.reset(new Msg());

    socket_.async_receive_from(
        boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
        boost::bind(&server::handleReceiveFrom, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
  }

  void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
  {
    if (!error && bytes_recvd > 0) {
        printf("got data: %s. Adding to work queue\n", next_msg_->toString());
        g_work_queue.push(next_msg_); // Add received msg to work queue
        waitForNextMessage();
    } else {
        waitForNextMessage();
    } 
  }

private:
  boost::asio::io_service& io_service_;
  udp::socket socket_;

  udp::endpoint sender_endpoint_;
  boost::shared_ptr<Msg> next_msg_;
}

int main(int argc, char* argv[])
{
    try{
      boost::asio::io_service io_service;
      server s(io_service);
      io_service.run();
    catch(std::exception& e){
      std::err << "Exception: " << e.what() << std::endl;
    }
    return 0;
}

Теперь я обнаружил, что если handle_receive_from может вернуться, то notify_one () в concurrent_queue возвращает. Так что я думаю, это потому, что у меня есть рекурсивный цикл. Итак, как правильно начать прослушивание новых данных? и является ли асинхронный пример сервера udp некорректным, поскольку я основал его на том, что они уже делали.

РЕДАКТИРОВАТЬ: Хорошо, проблема стала еще более странной.

То, что я не упомянул здесь, - это то, что у меня есть класс под названием процессор Процессор выглядит так:

class processor
{
public:
   processor::processor(int thread_pool_size) :
      thread_pool_size_(thread_pool_size) { }

  void start()
  {
    boost::thread_group threads;
    for (std::size_t i = 0; i < thread_pool_size_; ++i){
        threads.create_thread(boost::bind(&ObbsServer::worker, this));
    }
  }

  void worker()
  {
    while (true){
        boost::shared_ptr<ObbsMsg> msg;
        g_work_queue.wait_and_pop(msg);
        printf("Got msg: %s\n", msg->toString());
    }
  }

private:
  int thread_pool_size_;
};

Теперь кажется, что если я извлекаю рабочую функцию самостоятельно и запускаю потоки от основного. оно работает! Может кто-нибудь объяснить, почему поток функционирует так, как я ожидал бы вне класса, но внутри у него есть побочные эффекты?

РЕДАКТИРОВАТЬ 2: Теперь он становится еще страннее

Я вытащил две функции (точно так же).

Один называется потребителем, другой работник.

т.е.

void worker()
{
    while (true){
        boost::shared_ptr<ObbsMsg> msg;
        printf("waiting for msg\n");
        g_work_queue.wait_and_pop(msg);
        printf("Got msg: %s\n", msg->toString());
    }
}

void consumer()
{
    while (true){
        boost::shared_ptr<ObbsMsg> msg;
        printf("waiting for msg\n");
        g_work_queue.wait_and_pop(msg);
        printf("Got msg: %s\n", msg->toString());
    }
}

Теперь потребитель находится вверху файла server.cpp. То есть где и код нашего сервера.

С другой стороны, работник живет в файле processor.cpp.

Сейчас я сейчас вообще не использую процессор. Основная функция теперь выглядит так:

void consumer();
void worker();

int main(int argc, char* argv[])
{
    try {
        boost::asio::io_service io_service;
        server net(io_service);
        //processor s(7);

        boost::thread_group threads;
        for (std::size_t i = 0; i < 7; ++i){
            threads.create_thread(worker); // this doesn't work
            // threads.create_thread(consumer); // THIS WORKS!?!?!?
        }

//        s.start();

        printf("Server Started...\n");
        boost::asio::io_service::work work(io_service);
        io_service.run();

        printf("exiting...\n");
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

Почему потребитель может получать поставленные в очередь предметы, а работник - нет. Это идентичные реализации с разными именами.

Это не имеет никакого смысла. Есть идеи?

Вот пример вывода при получении текста «Hello World»:

Выход 1: не работает. При вызове рабочей функции или использовании класса процессора.

Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1

Выход 2: работает при вызове функции-потребителя, которая идентична рабочей функции.

Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Got msg: hello world <----- this is what I've been wanting to see!
Destruct ObbsMsg: 0
waiting for msg

1 Ответ

1 голос
/ 05 августа 2010

Чтобы ответить на мой собственный вопрос.

Кажется, проблема в объявлении g_work_queue;

Объявлен в заголовочном файле как: static concurrent_queue g_work_queue;

Кажется, что объявлять его статичным - это не то, что я хочу делать. Очевидно, это создает отдельный объект очереди для каждого скомпилированного файла .o и, очевидно, отдельные замки и т. д.

Это объясняет, почему когда манипулировали очередью внутри одного и того же исходного файла с потребителем и производителем в одном файле это работало. Но когда это происходило в разных файлах, этого не происходило, потому что потоки ожидали разных объектов.

Итак, я снова объявил рабочую очередь следующим образом.

-- workqueue.h --
extern concurrent_queue< boost::shared_ptr<Msg> > g_work_queue;

-- workqueue.cpp --
#include "workqueue.h"
concurrent_queue< boost::shared_ptr<Msg> > g_work_queue;

Это устраняет проблему.

...