Объекты Data_t
отправляются через сокет TCP на сервер.Сервер создает объект ConnectionHandler
для обработки каждого входящего соединения.
В ConnectionHandler
, Data_t
объекты считываются один за другим, используя async_read
из сокета, а их поля num_
суммируются, а затем сохраняются в поле ConnectionHandler::total_sum_
.
Нужно ли блокировать ConnectionHandler::total_sum_
, поскольку в него будут записываться несколько потоков?
См. Код ниже.Обратите внимание, что
ConnectinHandler::received_data_
повторно используется в качестве буфера для хранения Data_t
объектов, считываемых из сокета.Безопасно ли это делать?
ConnectinHandler::process_data()
сначала обрабатывает объект Data_t
, а затем вызывает ConnectinHandler::read_pack()
для чтения из сокета снова.
struct Data_t
{
int num_;
//... some other data
}
template<typename ConnectionHandler>
class Server {
using shared_handler_t = std::shared_ptr<ConnectionHandler>;
public:
Server(int thread_count = 10) :
thread_count_(thread_count), acceptor_(io_service_)
void
start_server(uint16_t port) {
auto handler = std::make_shared<ConnectionHandler>(io_service_, configer_, thread_names_);
// set up the acceptor to listen on the tcp port
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port);
acceptor_.open(endpoint.protocol());
boost::system::error_code ec;
acceptor_.bind(endpoint);
acceptor_.listen();
acceptor_.async_accept(handler->socket(),
[=](boost::system::error_code const &ec) {
handle_new_connection(handler, ec);
});
// start pool of threads to process the asio events
for (int i = 0; i < thread_count_; ++i) {
thread_pool_.emplace_back([=] { io_service_.run(); });
}
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < thread_pool_.size(); ++i) {
thread_pool_[i].join();
}
}
private:
void
handle_new_connection(shared_handler_t handler,
boost::system::error_code const &error) {
if (error) {
return;
}
handler->start();
auto new_handler = std::make_shared<ConnectionHandler>(io_service_);
acceptor_.async_accept(new_handler->socket(),
[=](boost::system::error_code const &ec) {
handle_new_connection(new_handler, ec);
});
}
int thread_count_;
std::vector<std::thread> thread_pool_;
boost::asio::io_service io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
};
class ConnectionHandler : public std::enable_shared_from_this<ConnectionHandler> {
public:
ConnectionHandler (boost::asio::io_service &service) :
service_ (service), socket_ (service)
{
}
void
start ()
{
read_packet ();
}
private:
void
read_packet ()
{
auto me = shared_from_this ();
boost::asio::async_read (
socket_, boost::asio::buffer (&received_data_, sizeof (Data_t)),
boost::asio::transfer_exactly (sizeof (Data_t)),
[me] (boost::system::error_code const &ec, std::size_t bytes_xfer)
{
me->process_data (ec, bytes_xfer);
});
}
void
process_data (boost::system::error_code const &error,
std::size_t bytes_transferred)
{
if (error)
{
socket_.close ();
return;
}
total_sum_+=received_data_.num_;
read_packet ();
}
boost::asio::io_service &service_;
boost::asio::ip::tcp::socket socket_;
Data_t received_data_;
int total_sum_;
};