повысить asio async_write: как не чередовать вызовы async_write? - PullRequest
26 голосов
/ 13 октября 2011

Вот моя реализация:

  • Клиент A отправит сообщение для клиента B
  • Сервер обработает сообщение с помощью async_read нужного количества данных и будет ждать новых данных отКлиент A (для того, чтобы не блокировать Клиента A)
  • После этого Сервер обработает информацию (возможно, выполнит запрос mysql), а затем отправит сообщение Клиенту B с помощью async_write.

Проблема в том, что если клиент A действительно быстро отправляет сообщение, async_writes будет чередоваться до вызова предыдущего обработчика async_write.

Есть ли простой способ избежать этой проблемы?

EDIT1: Если клиент C отправляет сообщение клиенту B сразу после клиента A, должна появиться та же проблема ...

РЕДАКТИРОВАТЬ 2: Это будет работать?потому что, кажется, блокирует, я не знаю, где ...

 namespace structure {                                                              
  class User {                                                                     
  public:                                                                          
    User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) :
      m_socket(io_service, context), m_strand(io_service), is_writing(false) {}    

    ssl_socket& getSocket() {                                                      
      return m_socket;                                                             
    }                                                                              

    boost::asio::strand getStrand() {                                              
      return m_strand;                                                             
    }                                                                              

    void push(std::string str) {                                                   
      m_strand.post(boost::bind(&structure::User::strand_push, this, str));        
    }                                                                              

    void strand_push(std::string str) {                                            

      std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;       
      m_queue.push(str);                                                           
      if (!is_writing) {                                                           
        write();                                                                   
        std::cout << "going to write" << std::endl;                                
      }                                                                            
      std::cout << "Already writing" << std::endl;                                 
    }                                                                              

    void write() {                                                                 
      std::cout << "writing" << std::endl;                                         
      is_writing = true;                                                           
      std::string str = m_queue.front();                                           
      boost::asio::async_write(m_socket,                                           
                               boost::asio::buffer(str.c_str(), str.size()),       
                               boost::bind(&structure::User::sent, this)           
                               );                                                  
    }                                                                              

    void sent() {                                                                  
      std::cout << "sent" << std::endl;                                            
      m_queue.pop();                                                               
      if (!m_queue.empty()) {                                                      
        write();                                                                   
        return;                                                                    
      }                                                                            
      else                                                                         
        is_writing = false;                                                        
      std::cout << "done sent" << std::endl;                                       
    }                                          

  private:                                     
    ssl_socket          m_socket;              
    boost::asio::strand m_strand;              
    std::queue<std::string>     m_queue;       
    bool                        is_writing;    
  };                                           
}                                              

#endif

Ответы [ 2 ]

42 голосов
/ 13 октября 2011

Есть ли простой способ избежать этой проблемы?

Да, поддерживать исходящую очередь для каждого клиента.Проверьте размер очереди в обработчике завершения async_write, если он не равен нулю, запустите другую операцию async_write.Вот пример

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <deque>
#include <iostream>
#include <string>

class Connection
{
public:
    Connection(
            boost::asio::io_service& io_service
            ) :
        _io_service( io_service ),
        _strand( _io_service ),
        _socket( _io_service ),
        _outbox()
    {

    }

    void write( 
            const std::string& message
            )
    {
        _strand.post(
                boost::bind(
                    &Connection::writeImpl,
                    this,
                    message
                    )
                );
    }

private:
    void writeImpl(
            const std::string& message
            )
    {
        _outbox.push_back( message );
        if ( _outbox.size() > 1 ) {
            // outstanding async_write
            return;
        }

        this->write();
    }

    void write()
    {
        const std::string& message = _outbox[0];
        boost::asio::async_write(
                _socket,
                boost::asio::buffer( message.c_str(), message.size() ),
                _strand.wrap(
                    boost::bind(
                        &Connection::writeHandler,
                        this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred
                        )
                    )
                );
    }

    void writeHandler(
            const boost::system::error_code& error,
            const size_t bytesTransferred
            )
    {
        _outbox.pop_front();

        if ( error ) {
            std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
            return;
        }

        if ( !_outbox.empty() ) {
            // more messages to send
            this->write();
        }
    }


private:
    typedef std::deque<std::string> Outbox;

private:
    boost::asio::io_service& _io_service;
    boost::asio::io_service::strand _strand;
    boost::asio::ip::tcp::socket _socket;
    Outbox _outbox;
};

int
main()
{
    boost::asio::io_service io_service;
    Connection foo( io_service );
}

некоторые ключевые моменты

  • boost::asio::io_service::strand защищает доступ к Connection::_outbox
  • , от которого обработчик Connection::write() отправляется обработчикэто общедоступно

для меня не было очевидным, использовали ли вы подобные практики в примере в вашем вопросе, поскольку все методы общедоступны.

7 голосов
/ 31 декабря 2017

Просто пытаюсь улучшить великий ответ Сэма.Точки улучшения:

  • async_write изо всех сил пытается отправить каждый байт из буфера (ов) до завершения, что означает, что вы должны предоставить все входные данныеданные, которые у вас есть для операции записи, в противном случае затраты на кадрирование могут увеличиться из-за того, что TCP-пакеты меньше, чем они могли бы быть.

  • asio::streambuf, в то время какбудучи очень удобным в использовании, не является нулевой копией.В приведенном ниже примере демонстрируется подход ноль-копия : сохраните порции входных данных там, где они есть, и используйте перегрузку разброса / сбора async_write, которая принимает последовательность входных буферов (которые являются просто указателями на фактические входные данные).

Полный исходный код:

#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>

using namespace std::chrono_literals;
using boost::asio::ip::tcp;

class Server
{
  class Connection : public std::enable_shared_from_this<Connection>
  {
    friend class Server;
    void ProcessCommand(const std::string& cmd) {
      if (cmd == "stop") {
        server_.Stop();
        return;
      }
      if (cmd == "") {
        Close();
        return;
      }
      std::thread t([this, self = shared_from_this(), cmd] {
        for (int i = 0; i < 30; ++i) {
          Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n");
        }
        server_.io_service_.post([this, self] {
          DoReadCmd();
        });
      });
      t.detach();
    }

    void DoReadCmd() {
      read_timer_.expires_from_now(server_.read_timeout_);
      read_timer_.async_wait([this](boost::system::error_code ec) {
        if (!ec) {
          std::cout << "Read timeout\n";
          Shutdown();
        }
      });
      boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) {
        read_timer_.cancel();
        if (!ec) {
          const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data());
          std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1));
          buf_in_.consume(bytes_read);
          ProcessCommand(cmd);
        }
        else {
          Close();
        }
      });
    }

    void DoWrite() {
      active_buffer_ ^= 1; // switch buffers
      for (const auto& data : buffers_[active_buffer_]) {
        buffer_seq_.push_back(boost::asio::buffer(data));
      }
      write_timer_.expires_from_now(server_.write_timeout_);
      write_timer_.async_wait([this](boost::system::error_code ec) {
        if (!ec) {
          std::cout << "Write timeout\n";
          Shutdown();
        }
      });
      boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) {
        write_timer_.cancel();
        std::lock_guard<std::mutex> lock(buffers_mtx_);
        buffers_[active_buffer_].clear();
        buffer_seq_.clear();
        if (!ec) {
          std::cout << "Wrote " << bytes_transferred << " bytes\n";
          if (!buffers_[active_buffer_ ^ 1].empty()) // have more work
            DoWrite();
        }
        else {
          Close();
        }
      });
    }
    bool Writing() const { return !buffer_seq_.empty(); }

    Server& server_;
    boost::asio::streambuf buf_in_;
    std::mutex buffers_mtx_;
    std::vector<std::string> buffers_[2]; // a double buffer
    std::vector<boost::asio::const_buffer> buffer_seq_;
    int active_buffer_ = 0;
    bool closing_ = false;
    bool closed_ = false;
    boost::asio::deadline_timer read_timer_, write_timer_;
    tcp::socket socket_;
  public:
    Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) {
    }

    void Start() {
      socket_.set_option(tcp::no_delay(true));
      DoReadCmd();
    }

    void Close() {
      closing_ = true;
      if (!Writing())
        Shutdown();
    }

    void Shutdown() {
      if (!closed_) {
        closing_ = closed_ = true;
        boost::system::error_code ec;
        socket_.shutdown(tcp::socket::shutdown_both, ec);
        socket_.close();
        server_.active_connections_.erase(shared_from_this());
      }
    }

    void Write(std::string&& data) {
      std::lock_guard<std::mutex> lock(buffers_mtx_);
      buffers_[active_buffer_ ^ 1].push_back(std::move(data)); // move input data to the inactive buffer
      if (!Writing())
        DoWrite();
    }

  };

  void DoAccept() {
    if (acceptor_.is_open()) {
      auto session = std::make_shared<Connection>(*this);
      acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) {
        if (!ec) {
          active_connections_.insert(session);
          session->Start();
        }
        DoAccept();
      });
    }
  }

  boost::asio::io_service io_service_;
  tcp::acceptor acceptor_;
  std::unordered_set<std::shared_ptr<Connection>> active_connections_;
  const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30);
  const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30);

public:
  Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { }

  void Run() {
    std::cout << "Listening on " << acceptor_.local_endpoint() << "\n";
    DoAccept();
    io_service_.run();
  }

  void Stop() {
    acceptor_.close();
    {
      std::vector<std::shared_ptr<Connection>> sessionsToClose;
      copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose));
      for (auto& s : sessionsToClose)
        s->Shutdown();
    }
    active_connections_.clear();
    io_service_.stop();
  }

};

int main() {
  try {
    Server srv(8888);
    srv.Run();
  }
  catch (const std::exception& e) {
    std::cerr << "Error: " << e.what() << "\n";
  }
}
...