Нужно заблокировать внутреннее состояние обработчика сокета boost asio tcp для многопоточного доступа? - PullRequest
0 голосов
/ 29 мая 2018

Объекты Data_t отправляются через сокет TCP на сервер.Сервер создает объект ConnectionHandler для обработки каждого входящего соединения.

В ConnectionHandler, Data_t объекты считываются один за другим, используя async_read из сокета, а их поля num_ суммируются, а затем сохраняются в поле ConnectionHandler::total_sum_.

Нужно ли блокировать ConnectionHandler::total_sum_, поскольку в него будут записываться несколько потоков?

См. Код ниже.Обратите внимание, что

ConnectinHandler::received_data_ повторно используется в качестве буфера для хранения Data_t объектов, считываемых из сокета.Безопасно ли это делать?

ConnectinHandler::process_data() сначала обрабатывает объект Data_t, а затем вызывает ConnectinHandler::read_pack() для чтения из сокета снова.

struct Data_t
{
   int num_;
   //... some other data
}

template<typename ConnectionHandler>
class Server {
    using shared_handler_t = std::shared_ptr<ConnectionHandler>;
public:
    Server(int thread_count = 10) :
            thread_count_(thread_count), acceptor_(io_service_)
    void
    start_server(uint16_t port) {
        auto handler = std::make_shared<ConnectionHandler>(io_service_, configer_, thread_names_);
        // set up the acceptor to listen on the tcp port
        boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port);
        acceptor_.open(endpoint.protocol());
        boost::system::error_code ec;
        acceptor_.bind(endpoint);
        acceptor_.listen();
        acceptor_.async_accept(handler->socket(),
                               [=](boost::system::error_code const &ec) {
                                   handle_new_connection(handler, ec);
                               }); 
        // start pool of threads to process the asio events
        for (int i = 0; i < thread_count_; ++i) {
            thread_pool_.emplace_back([=] { io_service_.run(); });
        }
        // Wait for all threads in the pool to exit.
        for (std::size_t i = 0; i < thread_pool_.size(); ++i) {
            thread_pool_[i].join();
        }
    }
private:
    void
    handle_new_connection(shared_handler_t handler,
                          boost::system::error_code const &error) {
        if (error) {
            return;
        }
        handler->start();
        auto new_handler = std::make_shared<ConnectionHandler>(io_service_);
        acceptor_.async_accept(new_handler->socket(),
                               [=](boost::system::error_code const &ec) {
                                   handle_new_connection(new_handler, ec);
                               });
    }
    int thread_count_;
    std::vector<std::thread> thread_pool_;
    boost::asio::io_service io_service_;
    boost::asio::ip::tcp::acceptor acceptor_;
};


class ConnectionHandler : public std::enable_shared_from_this<ConnectionHandler> {
 public:
  ConnectionHandler (boost::asio::io_service &service) :
      service_ (service), socket_ (service)
  {
  }
  void
  start ()
  {
    read_packet ();
  }

 private:
  void
  read_packet ()
  {
    auto me = shared_from_this ();
    boost::asio::async_read (
        socket_, boost::asio::buffer (&received_data_, sizeof (Data_t)),
        boost::asio::transfer_exactly (sizeof (Data_t)),
        [me] (boost::system::error_code const &ec, std::size_t bytes_xfer)
        {
            me->process_data (ec, bytes_xfer);
        });

  }
  void
  process_data (boost::system::error_code const &error,
                    std::size_t bytes_transferred)
  {
    if (error)
      {
        socket_.close ();
        return;
      }
    total_sum_+=received_data_.num_;
    read_packet ();
  }
  boost::asio::io_service &service_;
  boost::asio::ip::tcp::socket socket_;
  Data_t received_data_;
  int total_sum_;
};
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...