TCP :: boost :: asio TCP-клиент восстанавливает соединение после чтения EOF - PullRequest
0 голосов
/ 11 сентября 2018

Я хотел бы сделать TCP-клиент c ++, использующий boost :: asio, на основе этой статьи: https://www.gamedev.net/blogs/entry/2249317-a-guide-to-getting-started-with-boostasio/?pg=5

У меня есть существующий сервер, поведение которого похоже на конфигурацию маршрутизатора. Вы можете подключиться к нему, и как только у вас появится командная строка, вы можете отправить команду на сервер. Он выполняет его и отправляет обратно результат и командную строку. Затем вы можете отправить следующую команду ...

Нет проблем с этой частью. Мой код может устанавливать соединение, отправлять и получать данные, пока мой клиент не получит EOF. Это означает, что сервер разорвал соединение. Я бы хотел, чтобы мой клиент мог восстановить соединение, и эта часть не работает. Очень буду признателен за вашу помощь.

Вот мой заголовок:

#pragma once
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/asio/io_context_strand.hpp>

class Hive;
class Connection;

class Connection : public boost::enable_shared_from_this< Connection >
{
    friend class Hive;

private:
    static const int m_reconnect_interval = 15000;
    boost::shared_ptr< Hive > m_hive;
    boost::asio::ip::tcp::socket m_socket;
    boost::asio::io_service::strand m_io_strand;
    boost::asio::deadline_timer m_keepalive_timer;
    boost::asio::deadline_timer m_reconnect_timer;
    boost::posix_time::ptime m_last_time;
    std::vector< uint8_t > m_recv_buffer;
    std::list< int32_t > m_pending_recvs;
    std::list< std::vector< uint8_t > > m_pending_sends;
    int32_t m_receive_buffer_size;
    int32_t m_keepalive_timer_interval;

protected:
    Connection( boost::shared_ptr< Hive > hive );
    virtual ~Connection();

private:
    Connection( const Connection & rhs );
    Connection & operator =( const Connection & rhs );
    void StartSend();
    void StartRecv( int32_t total_bytes );
    void StartKeepAliveTimer();
    void StartReconnectTimer();
    void StartError(const boost::system::error_code &error, std::string caller);
    void DispatchSend( std::vector< uint8_t > buffer );
    void DispatchRecv( int32_t total_bytes );
    void DispatchTimer( const boost::system::error_code & error );
    void ReconnectTimer( const boost::system::error_code & error );
    void HandleConnect( const boost::system::error_code & error );
    void HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr );
    void HandleRecv( const boost::system::error_code & error, int32_t actual_bytes );
    void HandleKeepAliveTimer(const boost::system::error_code &error);
    void HandleReconnectTimer( const boost::system::error_code & error );
    void Reconnect();

private:
    // Called when the connection has successfully connected to the local host.
    virtual void OnAccept( const std::string & host, uint16_t port ) = 0;

    // Called when the connection has successfully connected to the remote host.
    virtual void OnConnect( const std::string & host, uint16_t port ) = 0;

    // Called when data has been sent by the connection.
    virtual void OnSend( const std::vector< uint8_t > & buffer ) = 0;

    // Called when data has been received by the connection.
    virtual void OnRecv( std::vector< uint8_t > & buffer ) = 0;

    // Called on each timer event.
    virtual void OnTimer( const boost::posix_time::time_duration & delta ) = 0;

    // Called when an error is encountered.
    virtual void OnError(const std::string error) = 0;

public:

    // Starts an a/synchronous connect.
    void Connect(const std::string &host, const std::string &port);

    // Posts data to be sent to the connection.
    void Send(std::string message);

    // Posts a recv for the connection to process. If total_bytes is 0, then
    // as many bytes as possible up to GetReceiveBufferSize() will be
    // waited for. If Recv is not 0, then the connection will wait for exactly
    // total_bytes before invoking OnRecv.
    void Recv( int32_t total_bytes = 0 );

    // Posts an asynchronous disconnect event for the object to process.
    void Disconnect();
};

class Hive : public boost::enable_shared_from_this< Hive >
{
private:
    boost::asio::io_service m_io_service;
    boost::shared_ptr< boost::asio::io_service::work > m_work_ptr;

private:
    Hive( const Hive & rhs );
    Hive & operator =( const Hive & rhs );

public:
    Hive();
    virtual ~Hive();

    // Returns the io_service of this object.
    boost::asio::io_service & GetService();

    // Polls the networking subsystem once from the current thread and returns.
    void Poll();

    // Runs the networking system on the current thread. This function blocks
    // until the networking system is stopped, so do not call on a single
    // threaded application with no other means of being able to call Stop
    // unless you code in such logic.
    void Run();

    // Stops the networking system. All work is finished and no more
    // networking interactions will be possible afterwards until Reset is called.
    void Stop();

    // Restarts the networking system after Stop as been called. A new work
    // object is created ad the shutdown flag is cleared.
    void Reset();
};

а вот мой cpp:

#include "network.h"
#include <boost/interprocess/detail/atomic.hpp>
#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/detail/workaround.hpp>
#include <boost/cstdint.hpp>
#include <iostream>

Hive::Hive()
        : m_work_ptr( new boost::asio::io_service::work( m_io_service ) )
{
}

Hive::~Hive()
{
}

boost::asio::io_service & Hive::GetService()
{
    return m_io_service;
}

void Hive::Poll()
{
    m_io_service.poll();
}

void Hive::Run()
{
    m_io_service.run();
}

void Hive::Stop()
{
    m_work_ptr.reset();
    m_io_service.run();
    m_io_service.stop();
}

void Hive::Reset()
{
    m_io_service.reset();
    m_work_ptr.reset( new boost::asio::io_service::work( m_io_service ) );
}


Connection::Connection( boost::shared_ptr< Hive > hive )
        : m_hive( hive ), m_socket( hive->GetService() ), m_io_strand( hive->GetService() ), m_keepalive_timer( hive->GetService() ), m_reconnect_timer(hive->GetService()), m_receive_buffer_size( 4096 ), m_keepalive_timer_interval( 2000 )
{
}

Connection::~Connection()
{
}

void Connection::StartSend()
{
    if( !m_pending_sends.empty() )
        boost::asio::async_write( m_socket, boost::asio::buffer( m_pending_sends.front() ),  boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleSend, shared_from_this(), boost::asio::placeholders::error, m_pending_sends.begin() ) ) );
}

void Connection::StartRecv( int32_t total_bytes )
{
    if( total_bytes > 0 )
    {
        m_recv_buffer.resize( total_bytes );
        boost::asio::async_read( m_socket, boost::asio::buffer( m_recv_buffer ),  boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
    }
    else
    {
        m_recv_buffer.resize( m_receive_buffer_size );
        m_socket.async_read_some( boost::asio::buffer( m_recv_buffer ),  boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
    }
}

void Connection::StartKeepAliveTimer()
{
    std::cout << "[" << __FUNCTION__ << "] " << std::endl;
    m_last_time = boost::posix_time::microsec_clock::local_time();
    m_keepalive_timer.expires_from_now( boost::posix_time::milliseconds( m_keepalive_timer_interval ) );
    m_keepalive_timer.async_wait(  boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::DispatchTimer, shared_from_this(), _1 ) ) );
}

void Connection::StartReconnectTimer()
{
    std::cout << "[" << __FUNCTION__ << "] " << std::endl;
    m_last_time = boost::posix_time::microsec_clock::local_time();
    m_reconnect_timer.expires_from_now( boost::posix_time::milliseconds( m_reconnect_interval ) );
    m_reconnect_timer.async_wait(  boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::ReconnectTimer, shared_from_this(), _1 ) ) );
}

void Connection::StartError(const boost::system::error_code &error, std::string caller)
{
    OnError(caller + " " + error.message() + "(" + error.category().name() + ":" + std::to_string(error.value()) + ")");
    boost::system::error_code ec;
    m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
    m_socket.close(ec);
    m_keepalive_timer.cancel(ec);

    if (error == boost::asio::error::misc_errors::eof || error == boost::system::errc::timed_out)
        StartReconnectTimer();
}

void Connection::HandleConnect( const boost::system::error_code & error )
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        if( m_socket.is_open() )
        {
            OnConnect( m_socket.remote_endpoint().address().to_string(), m_socket.remote_endpoint().port() );
            m_reconnect_timer.cancel();
            StartKeepAliveTimer();
        }
        else
            StartError(error, __FUNCTION__);
    }
}

void Connection::HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr )
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        OnSend( *itr );
        m_pending_sends.erase( itr );
        StartSend();
    }
}

void Connection::HandleRecv( const boost::system::error_code & error, int32_t actual_bytes )
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        m_recv_buffer.resize( actual_bytes );
        OnRecv( m_recv_buffer );
        m_pending_recvs.pop_front();
        if( !m_pending_recvs.empty() )
            StartRecv( m_pending_recvs.front() );
    }
}

void Connection::HandleKeepAliveTimer(const boost::system::error_code &error)
{
    if( error )
        StartError(error, __FUNCTION__);
    else
    {
        OnTimer( boost::posix_time::microsec_clock::local_time() - m_last_time );
        StartKeepAliveTimer();
    }
}

void Connection::HandleReconnectTimer( const boost::system::error_code & error )
{
    std::cout << "[" << __FUNCTION__ << "] Calling reconnect... " << std::endl;
    Reconnect();
}

void Connection::DispatchSend( std::vector< uint8_t > buffer )
{
    bool should_start_send = m_pending_sends.empty();
    m_pending_sends.push_back( buffer );
    if( should_start_send )
        StartSend();
}

void Connection::DispatchRecv( int32_t total_bytes )
{
    bool should_start_receive = m_pending_recvs.empty();
    m_pending_recvs.push_back( total_bytes );
    if( should_start_receive )
        StartRecv( total_bytes );
}

void Connection::DispatchTimer( const boost::system::error_code & error )
{
    boost::asio::post(m_io_strand, boost::bind(&Connection::HandleKeepAliveTimer, shared_from_this(), error ) );
}

void Connection::ReconnectTimer( const boost::system::error_code & error )
{
    boost::asio::post(m_io_strand, boost::bind( &Connection::HandleReconnectTimer, shared_from_this(), error ) );
}

void Connection::Connect(const std::string &host, const std::string &port)
{
    std::cout << "[" << __FUNCTION__ << "] connecting to "<< host << ":" << port << std::endl;
    boost::system::error_code ec;
    boost::asio::ip::tcp::resolver resolver( m_hive->GetService() );
    boost::asio::ip::tcp::resolver::query query( host, port );
    boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve( query );
    m_socket.async_connect( *iterator, boost::asio::bind_executor(m_io_strand, boost::bind( &Connection::HandleConnect, shared_from_this(), _1 ) ) );
}

void Connection::Reconnect()
{
    m_hive->Reset();
    Connect("10.40.60.72", "23");
}

void Connection::Disconnect()
{
    boost::asio::post(m_io_strand, boost::bind(&Connection::HandleKeepAliveTimer, shared_from_this(), boost::asio::error::connection_reset ) );
}

void Connection::Recv( int32_t total_bytes )
{
    boost::asio::post(m_io_strand, boost::bind( &Connection::DispatchRecv, shared_from_this(), total_bytes ) );
}

void Connection::Send(std::string message)
{
    std::vector< uint8_t > request;
    std::copy( message.begin(), message.end(), std::back_inserter( request ) );
    boost::asio::post(m_io_strand, boost::bind( &Connection::DispatchSend, shared_from_this(), request ) );
}
...