Я хотел бы сделать TCP-клиент c ++, использующий boost :: asio, на основе этой статьи:
https://www.gamedev.net/blogs/entry/2249317-a-guide-to-getting-started-with-boostasio/?pg=5
У меня есть существующий сервер, поведение которого похоже на конфигурацию маршрутизатора. Вы можете подключиться к нему, и как только у вас появится командная строка, вы можете отправить команду на сервер. Он выполняет его и отправляет обратно результат и командную строку. Затем вы можете отправить следующую команду ...
Нет проблем с этой частью. Мой код может устанавливать соединение, отправлять и получать данные, пока мой клиент не получит EOF. Это означает, что сервер разорвал соединение. Я бы хотел, чтобы мой клиент мог восстановить соединение, и эта часть не работает. Очень буду признателен за вашу помощь.
Вот мой заголовок:
#pragma once
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/asio/io_context_strand.hpp>
class Hive;
class Connection;
class Connection : public boost::enable_shared_from_this< Connection >
{
friend class Hive;
private:
static const int m_reconnect_interval = 15000;
boost::shared_ptr< Hive > m_hive;
boost::asio::ip::tcp::socket m_socket;
boost::asio::io_service::strand m_io_strand;
boost::asio::deadline_timer m_keepalive_timer;
boost::asio::deadline_timer m_reconnect_timer;
boost::posix_time::ptime m_last_time;
std::vector< uint8_t > m_recv_buffer;
std::list< int32_t > m_pending_recvs;
std::list< std::vector< uint8_t > > m_pending_sends;
int32_t m_receive_buffer_size;
int32_t m_keepalive_timer_interval;
protected:
Connection( boost::shared_ptr< Hive > hive );
virtual ~Connection();
private:
Connection( const Connection & rhs );
Connection & operator =( const Connection & rhs );
void StartSend();
void StartRecv( int32_t total_bytes );
void StartKeepAliveTimer();
void StartReconnectTimer();
void StartError(const boost::system::error_code &error, std::string caller);
void DispatchSend( std::vector< uint8_t > buffer );
void DispatchRecv( int32_t total_bytes );
void DispatchTimer( const boost::system::error_code & error );
void ReconnectTimer( const boost::system::error_code & error );
void HandleConnect( const boost::system::error_code & error );
void HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr );
void HandleRecv( const boost::system::error_code & error, int32_t actual_bytes );
void HandleKeepAliveTimer(const boost::system::error_code &error);
void HandleReconnectTimer( const boost::system::error_code & error );
void Reconnect();
private:
// Called when the connection has successfully connected to the local host.
virtual void OnAccept( const std::string & host, uint16_t port ) = 0;
// Called when the connection has successfully connected to the remote host.
virtual void OnConnect( const std::string & host, uint16_t port ) = 0;
// Called when data has been sent by the connection.
virtual void OnSend( const std::vector< uint8_t > & buffer ) = 0;
// Called when data has been received by the connection.
virtual void OnRecv( std::vector< uint8_t > & buffer ) = 0;
// Called on each timer event.
virtual void OnTimer( const boost::posix_time::time_duration & delta ) = 0;
// Called when an error is encountered.
virtual void OnError(const std::string error) = 0;
public:
// Starts an a/synchronous connect.
void Connect(const std::string &host, const std::string &port);
// Posts data to be sent to the connection.
void Send(std::string message);
// Posts a recv for the connection to process. If total_bytes is 0, then
// as many bytes as possible up to GetReceiveBufferSize() will be
// waited for. If Recv is not 0, then the connection will wait for exactly
// total_bytes before invoking OnRecv.
void Recv( int32_t total_bytes = 0 );
// Posts an asynchronous disconnect event for the object to process.
void Disconnect();
};
class Hive : public boost::enable_shared_from_this< Hive >
{
private:
boost::asio::io_service m_io_service;
boost::shared_ptr< boost::asio::io_service::work > m_work_ptr;
private:
Hive( const Hive & rhs );
Hive & operator =( const Hive & rhs );
public:
Hive();
virtual ~Hive();
// Returns the io_service of this object.
boost::asio::io_service & GetService();
// Polls the networking subsystem once from the current thread and returns.
void Poll();
// Runs the networking system on the current thread. This function blocks
// until the networking system is stopped, so do not call on a single
// threaded application with no other means of being able to call Stop
// unless you code in such logic.
void Run();
// Stops the networking system. All work is finished and no more
// networking interactions will be possible afterwards until Reset is called.
void Stop();
// Restarts the networking system after Stop as been called. A new work
// object is created ad the shutdown flag is cleared.
void Reset();
};
а вот мой cpp:
#include "network.h"
#include <boost/interprocess/detail/atomic.hpp>
#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/detail/workaround.hpp>
#include <boost/cstdint.hpp>
#include <iostream>
Hive::Hive()
: m_work_ptr( new boost::asio::io_service::work( m_io_service ) )
{
}
Hive::~Hive()
{
}
boost::asio::io_service & Hive::GetService()
{
return m_io_service;
}
void Hive::Poll()
{
m_io_service.poll();
}
void Hive::Run()
{
m_io_service.run();
}
void Hive::Stop()
{
m_work_ptr.reset();
m_io_service.run();
m_io_service.stop();
}
void Hive::Reset()
{
m_io_service.reset();
m_work_ptr.reset( new boost::asio::io_service::work( m_io_service ) );
}
Connection::Connection( boost::shared_ptr< Hive > hive )
: m_hive( hive ), m_socket( hive->GetService() ), m_io_strand( hive->GetService() ), m_keepalive_timer( hive->GetService() ), m_reconnect_timer(hive->GetService()), m_receive_buffer_size( 4096 ), m_keepalive_timer_interval( 2000 )
{
}
Connection::~Connection()
{
}
void Connection::StartSend()
{
if( !m_pending_sends.empty() )
boost::asio::async_write( m_socket, boost::asio::buffer( m_pending_sends.front() ), boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleSend, shared_from_this(), boost::asio::placeholders::error, m_pending_sends.begin() ) ) );
}
void Connection::StartRecv( int32_t total_bytes )
{
if( total_bytes > 0 )
{
m_recv_buffer.resize( total_bytes );
boost::asio::async_read( m_socket, boost::asio::buffer( m_recv_buffer ), boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
}
else
{
m_recv_buffer.resize( m_receive_buffer_size );
m_socket.async_read_some( boost::asio::buffer( m_recv_buffer ), boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
}
}
void Connection::StartKeepAliveTimer()
{
std::cout << "[" << __FUNCTION__ << "] " << std::endl;
m_last_time = boost::posix_time::microsec_clock::local_time();
m_keepalive_timer.expires_from_now( boost::posix_time::milliseconds( m_keepalive_timer_interval ) );
m_keepalive_timer.async_wait( boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::DispatchTimer, shared_from_this(), _1 ) ) );
}
void Connection::StartReconnectTimer()
{
std::cout << "[" << __FUNCTION__ << "] " << std::endl;
m_last_time = boost::posix_time::microsec_clock::local_time();
m_reconnect_timer.expires_from_now( boost::posix_time::milliseconds( m_reconnect_interval ) );
m_reconnect_timer.async_wait( boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::ReconnectTimer, shared_from_this(), _1 ) ) );
}
void Connection::StartError(const boost::system::error_code &error, std::string caller)
{
OnError(caller + " " + error.message() + "(" + error.category().name() + ":" + std::to_string(error.value()) + ")");
boost::system::error_code ec;
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_socket.close(ec);
m_keepalive_timer.cancel(ec);
if (error == boost::asio::error::misc_errors::eof || error == boost::system::errc::timed_out)
StartReconnectTimer();
}
void Connection::HandleConnect( const boost::system::error_code & error )
{
if( error )
StartError(error, __FUNCTION__);
else
{
if( m_socket.is_open() )
{
OnConnect( m_socket.remote_endpoint().address().to_string(), m_socket.remote_endpoint().port() );
m_reconnect_timer.cancel();
StartKeepAliveTimer();
}
else
StartError(error, __FUNCTION__);
}
}
void Connection::HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr )
{
if( error )
StartError(error, __FUNCTION__);
else
{
OnSend( *itr );
m_pending_sends.erase( itr );
StartSend();
}
}
void Connection::HandleRecv( const boost::system::error_code & error, int32_t actual_bytes )
{
if( error )
StartError(error, __FUNCTION__);
else
{
m_recv_buffer.resize( actual_bytes );
OnRecv( m_recv_buffer );
m_pending_recvs.pop_front();
if( !m_pending_recvs.empty() )
StartRecv( m_pending_recvs.front() );
}
}
void Connection::HandleKeepAliveTimer(const boost::system::error_code &error)
{
if( error )
StartError(error, __FUNCTION__);
else
{
OnTimer( boost::posix_time::microsec_clock::local_time() - m_last_time );
StartKeepAliveTimer();
}
}
void Connection::HandleReconnectTimer( const boost::system::error_code & error )
{
std::cout << "[" << __FUNCTION__ << "] Calling reconnect... " << std::endl;
Reconnect();
}
void Connection::DispatchSend( std::vector< uint8_t > buffer )
{
bool should_start_send = m_pending_sends.empty();
m_pending_sends.push_back( buffer );
if( should_start_send )
StartSend();
}
void Connection::DispatchRecv( int32_t total_bytes )
{
bool should_start_receive = m_pending_recvs.empty();
m_pending_recvs.push_back( total_bytes );
if( should_start_receive )
StartRecv( total_bytes );
}
void Connection::DispatchTimer( const boost::system::error_code & error )
{
boost::asio::post(m_io_strand, boost::bind(&Connection::HandleKeepAliveTimer, shared_from_this(), error ) );
}
void Connection::ReconnectTimer( const boost::system::error_code & error )
{
boost::asio::post(m_io_strand, boost::bind( &Connection::HandleReconnectTimer, shared_from_this(), error ) );
}
void Connection::Connect(const std::string &host, const std::string &port)
{
std::cout << "[" << __FUNCTION__ << "] connecting to "<< host << ":" << port << std::endl;
boost::system::error_code ec;
boost::asio::ip::tcp::resolver resolver( m_hive->GetService() );
boost::asio::ip::tcp::resolver::query query( host, port );
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve( query );
m_socket.async_connect( *iterator, boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleConnect, shared_from_this(), _1 ) ) );
}
void Connection::Reconnect()
{
m_hive->Reset();
Connect("10.40.60.72", "23");
}
void Connection::Disconnect()
{
boost::asio::post(m_io_strand, boost::bind(&Connection::HandleKeepAliveTimer, shared_from_this(), boost::asio::error::connection_reset ) );
}
void Connection::Recv( int32_t total_bytes )
{
boost::asio::post(m_io_strand, boost::bind( &Connection::DispatchRecv, shared_from_this(), total_bytes ) );
}
void Connection::Send(std::string message)
{
std::vector< uint8_t > request;
std::copy( message.begin(), message.end(), std::back_inserter( request ) );
boost::asio::post(m_io_strand, boost::bind( &Connection::DispatchSend, shared_from_this(), request ) );
}