C ++ boost / asio сервер с клиентом - PullRequest
2 голосов
/ 15 февраля 2012

Работая над асинхронным сетевым программированием для p2p-приложения, у меня возникли проблемы.Мое приложение должно быть и сервером, и клиентом.Когда сервер получает запрос, он должен передать его k другим серверам.Я думал, что Пример HTTP Server 3 примеров boost :: asio мог бы хорошо работать с реализацией асинхронного клиента (как класса).Класс клиента, упомянутый выше (из примеров клиентов boost :: asio), выглядит следующим образом:

  ClientIO::ClientIO(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator)
:   _io_service(io_service),
      strand_(io_service),
      resolver_(io_service),
  socket_(io_service)
  {
  tcp::endpoint endpoint = *endpoint_iterator;
      socket_.async_connect(endpoint,
      boost::bind(&ClientIO::handle_after_connect, this,
      boost::asio::placeholders::error, ++endpoint_iterator));
  }

  void ClientIO::write(G3P mex)
  {
  _io_service.post(boost::bind(&ClientIO::writeMessage, this, mex));
  }

  void ClientIO::writeMessage(G3P mex)
  {
  bool write_in_progress = !messages_queue_.empty();
  messages_queue_.push_back(mex);
  if (!write_in_progress)
  {
    char* message=NULL;
    boost::system::error_code ec;
    if (messages_queue_.front().opcode == DATA)
    {
      message=(char*)malloc((10800)*sizeof(char));
    }
    else
      message=(char*)malloc(1024*sizeof(char));

    boost::asio::streambuf request;
    std::ostream request_stream(&request);
    serializeMessage(message, messages_queue_.front());
    request_stream   << message;
    boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)),
    strand_.wrap(
    boost::bind(&ClientIO::handle_after_write, this,
    boost::asio::placeholders::error)));
      free(message);
  }
  }

  void ClientIO::readMessage()
  {
boost::asio::async_read(socket_, data_,
    boost::bind(&ClientIO::handle_after_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
    ));
  }

  void ClientIO::stop()
  {
  socket_.shutdown(tcp::socket::shutdown_both);
  socket_.close();
  }

  void ClientIO::handle_after_connect(const boost::system::error_code& error,
    tcp::resolver::iterator endpoint_iterator)
  {
  if (error)
  {
    if (endpoint_iterator != tcp::resolver::iterator())
    {
      socket_.close();
      tcp::endpoint endpoint = *endpoint_iterator;
      socket_.async_connect(endpoint,
      boost::bind(&ClientIO::handle_after_connect,this,
      boost::asio::placeholders::error, ++endpoint_iterator));
    }
  }
  else
  {
  }
  }

  void ClientIO::handle_after_read(const boost::system::error_code& error, std::size_t bytes_transferred)
  {
  if (bytes_transferred > 0)
  {
    std::istream response_stream(&data_);
    std::string mex="";
    std::getline(response_stream, mex);
    deserializeMessage(&reply_,mex);
    if (reply_.opcode == REPL)
    {
      cout << "ack received" << endl;
    }
  }
  if (error)
  {
    ERROR_MSG(error.message());
  }
  }

  void ClientIO::handle_after_write(const boost::system::error_code& error)
  {
  if (error)
  {
  //            ERROR_MSG("Error in write: " << error.message());
  }
  else
  {
    messages_queue_.pop_front();
    if (!messages_queue_.empty())
    {
      cout << "[w] handle after write" << endl;
      char* message;
      if (messages_queue_.front().opcode == DATA)
      {
    message=(char*)malloc((10800)*sizeof(char));
      }
      else
    message=(char*)malloc(1024*sizeof(char));
      boost::asio::streambuf request;
      std::ostream request_stream(&request);
      serializeMessage(message, messages_queue_.front());
      request_stream << message;
      boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)),
      strand_.wrap(
      boost::bind(&ClientIO::handle_after_write, this,
      boost::asio::placeholders::error)));
    }

    boost::asio::async_read_until(socket_, data_,"\r\n",
            strand_.wrap(
            boost::bind(&ClientIO::handle_after_read, this,
        boost::asio::placeholders::error,
        boost::asio::placeholders::bytes_transferred)));
  }
  }

  ClientIO::~ClientIO()
  {
  cout << "service stopped" << endl;
  }

}

Когда сервер получает новый запрос, он запускает новое управление даннымисоединение формы класса и после некоторого вычисления, написать очередь на другие серверы (здесь только один), используя класс выше, и при каждой записи должен соответствовать ack

client  --write-> server ---write->\ 
                                    |--server1
                 server <--ACK----</ 

. Для этого я создалэкземпляр io_service (io_service_test) в качестве переменной класса, создающий его экземпляр с помощью следующего в конструкторе DataManagement:

DataManagement::DataManagement(){
  tcp::resolver resolver(io_service_test);
  tcp::resolver::query query(remotehost, remoteport);
  tcp::resolver::iterator iterator = resolver.resolve(query);
  cluster = new cluster_head::ClusterIO(io_service_test,iterator);
  io_service_test.run_one();
}

Затем, после вычисления, отправьте данные:

 void DataManagement::sendTuple( . . . ){

  . . . 
  io_service_test.reset();
  io_service_test.run();
  for (size_t i=0; i<ready_queue.size() ;i++)
  {
      cluster->write(fragTuple);
  }
}

аналог это тот же пример http proxy3, измененный таким же образом (без класса клиента).Проблема в том, что иногда все работает хорошо, иногда происходит сбой, и я получаю трассировку стека, иногда он никогда не останавливается, или даже ошибки сегментации.Я думаю, что проблема закрыта для управления io_service и жизни методов класса, но я не могу понять.

  • Есть идеи?
  • есть ли у вас примеры, подходящие для этого случая, или фиктивный класс, который его реализует?

1 Ответ

1 голос
/ 23 февраля 2012

Кратко проанализировав код, я вижу следующие проблемы.

  1. Метод ClientIO::writeMessage отправляет неверную информацию получателям, поскольку он
    • выделяет память для сообщения
    • вызывает boost::asio::async_write, который не отправляет никаких данных, а только помещает запрос во внутреннюю очередь запросов ASIO, т.е. сообщение будет отправлено через некоторое время.boost::asio::buffer не копирует сообщение .Он хранит только ссылку на него.
    • звонки free(message).т. е. память, выделенная для сообщения , может быть перезаписана при выполнении запроса на запись в очереди.
  2. Утечка памяти в ClientIO::handle_after_write. сообщение выделено, но не освобождено.
  3. Метод boost::asio::async_read ClientIO::readMessage не переносится вызовом strand_.wrap.

Во избежаниепроблемы # 1 и # 2 необходимы для использования чего-то вроде класса shared_const_buffer в примере буферов ASIO.Для решения проблемы № 3 необходимо использовать вызов strand_.wrap так же, как в вызовах boost::asio::async_write.

...