Разработка симулятора сетевого клиента - PullRequest
3 голосов
/ 22 января 2011

Я пытаюсь разработать программное обеспечение на c ++, которое будет отправлять байты запроса (следуя стандартному протоколу ** уровня приложения **, поля которого будут заполнены из текстового файла), используя ** протокол UDP **.

Теперь этот клиент должен иметь возможность отправлять эти запросы с очень высокой скоростью ... до ** 2000 транзакций в секунду **, а также должен получать ответ, если он получает в течение указанного времени, иначе не получит его

Я буду использовать библиотеку Boost для всех сокетов, но я не уверен в ее дизайне для такого высокоскоростного приложения :(

Я думаю, что я должен использовать многопоточное приложение (снова будет использоваться Boost). Я прав ? Нужно ли создавать отдельный поток для каждого запроса? Но я думаю, что только один поток должен ожидать получения ответа, в противном случае, если многие потоки ожидают ответа, как мы можем различить, для какого потока запроса мы получили ответ !!

Надеюсь, что вопрос ясен. Мне просто нужна помощь в отношении вопросов проектирования и предполагаемых проблем, с которыми я могу столкнуться.

Ответы [ 2 ]

2 голосов
/ 22 января 2011

Я не уверен, что вам нужно использовать "тяжелую" многопоточность.Большинство высокоскоростных приложений используют механизмы опроса операционной системы, которые обычно масштабируются лучше, чем потоки.

Архитектура будет во многом зависеть от того, насколько реактивным должно быть ваше приложение, в зависимости от того, какие компоненты отвечают за генерацию.входы и выходы и выполнение фактической обработки.

Способ решения вашей проблемы с использованием boost :: asio состоял бы в том, чтобы иметь коммуникационный поток, который запускает метод boost :: asio :: io_service :: run.Служба io_service прослушивает различные сокеты UDP и обрабатывает сообщения по мере их поступления, возможно отправляя их в очередь, чтобы приложение могло обработать их в главном потоке.Из основного потока вы можете публиковать сообщения на io_services для их отправки основной системой.

Это должно позволить вам без особых проблем подниматься до 2000 сообщений в секунду.

Альтернативой может быть запуск нескольких потоков связи с помощью вызова boost :: asio :: io_service :: runметод несколько раз из нескольких потоков, позволяющий параллельно обрабатывать сообщения их потоком связи.

Хотя у Asio есть один совет: из-за его асинхронной архитектуры он работает лучше, если следовать его логикеи используйте его так, как он предназначен.Если вы обнаружите, что используете много блокировок и сами управляете множеством потоков, то вы, вероятно, делаете это неправильно.Внимательно посмотрите на гарантии безопасности потоков различных методов и изучите приведенные примеры.

2 голосов
/ 22 января 2011

Сейчас я нахожусь на полпути через своего собственного сетевого клиента, поэтому, возможно, я смогу поделиться некоторыми советами и ресурсами, на которые можно посмотреть. В этой области есть много более опытных, и, надеюсь, они присоединятся:)

Во-первых, вы о повышении. Как только вы привыкнете, как все это сочетается, boost::asio станет отличным инструментарием для написания сетевого кода. По сути, вы создаете io_service и вызываете run для выполнения до тех пор, пока вся работа не будет завершена, или runOne для выполнения одного действия ввода-вывода. Сами по себе, это не так уж полезно. Питание приходит, когда вы запускаете runOne в своем собственном цикле:

boost::asio::io_service myIOService;
while(true)
{
    myIOService.runOne();
}

или запустите функцию run в одном (или нескольких) потоках:

boost::thread t(boost::bind(&boost::asio::io_service::run, &myIOService));

Тем не менее, стоит отметить, что run возвращается, как только нет работы (так что вы можете попрощаться с этим потоком). Как я выяснил здесь, в Stackoverflow, хитрость заключается в том, чтобы убедиться, что у него всегда есть чем заняться. Решение в boost::asio::io_service::work:

boost::asio::io_service::work myWork(myIOService);   // just some abstract "work"

Приведенная выше строка гарантирует, что ваш поток не остановится, когда ничего не происходит. Я рассматриваю это как средство поддержания жизни:)

В какой-то момент вы захотите создать сокет и подключить его куда-нибудь. Я создал общий класс Socket (и извлек текстовый сокет из него для создания буферизованного ввода). Я также хотел систему, основанную на событиях, которая работала очень похоже на C #. Я изложил этот материал для вас ниже:

Первый шаг, нам нужен общий способ передачи аргументов, следовательно, EventArgs:

eventArgs.h

 class EventArgs : boost::noncopyable
 {
 private:

 public:
  EventArgs();
  virtual ~EventArgs() = 0;
 }; // eo class EventArgs:

Теперь нам нужен класс событий, на который люди могут подписаться / отписаться:

event.h

// STL
#include <functional>
#include <stack>

// Boost
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

 // class Event
 class Event : boost::noncopyable
 {
 public:
  typedef std::function<void(const EventArgs&)> DelegateType;
  typedef boost::shared_ptr<DelegateType> DelegateDecl;

 private:
  boost::mutex m_Mutex;
  typedef std::set<DelegateDecl> DelegateSet;
  typedef std::stack<DelegateDecl> DelegateStack;
  typedef DelegateSet::const_iterator DelegateSet_cit;
  DelegateSet m_Delegates;
  DelegateStack m_ToRemove;

 public:
  Event()
  {
  }; // eo ctor


  Event(Event&& _rhs) : m_Delegates(std::move(_rhs.m_Delegates))
  {
  }; // eo mtor

  ~Event()
  {
  }; // eo dtor

  // static methods
  static DelegateDecl bindDelegate(DelegateType _f)
  {
   DelegateDecl ret(new DelegateType(_f));
   return ret;
  }; // eo bindDelegate

  // methods
  void raise(const EventArgs& _args)
  {
   boost::mutex::scoped_lock lock(m_Mutex);

   // get rid of any we have to remove
   while(m_ToRemove.size())
   {
    m_Delegates.erase(m_Delegates.find(m_ToRemove.top()));
    m_ToRemove.pop();
   };

   if(m_Delegates.size())
   std::for_each(m_Delegates.begin(),
        m_Delegates.end(),
        [&_args](const DelegateDecl& _decl) { (*_decl)(_args); });
  }; // eo raise

  DelegateDecl addListener(DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   m_Delegates.insert(_decl);
   return _decl;
  }; // eo addListener

  DelegateDecl addListener(DelegateType _f)
  {
   DelegateDecl ret(bindDelegate(_f));
   return addListener(ret);
  }; // eo addListener


  void removeListener(const DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   DelegateSet_cit cit(m_Delegates.find(_decl));
   if(cit != m_Delegates.end())
    m_ToRemove.push(_decl);
  }; // eo removeListener

  // operators

  // Only use operator += if you don't which to manually detach using removeListener
  Event& operator += (DelegateType _f)
  {
   addListener(_f);
   return *this;
  }; // eo op +=

 }; // eo class Event

Затем пришло время создать класс сокета. Ниже заголовок:

socket.h

(Некоторые примечания: ByteVector - typedef std::vector<unsigned char>)

#pragma once

#include "event.h"

// boost
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
  // class Socket
  class MORSE_API Socket : boost::noncopyable
  {
  protected:
   typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;

  private:
   ByteVector      m_Buffer;   // will be used to read in

   SocketPtr        m_SocketPtr;
   boost::asio::ip::tcp::endpoint      m_RemoteEndPoint;
   bool         m_bConnected;

   // reader
   void _handleConnect(const boost::system::error_code& _errorCode, boost::asio::ip::tcp::resolver_iterator _rit);
   void _handleRead(const boost::system::error_code& _errorCode, std::size_t read);
  protected:

   SocketPtr socket() { return m_SocketPtr; };
  public:
   Socket(ByteVector_sz _bufSize = 512);
   virtual ~Socket();

   // properties
   bool isConnected() const { return m_bConnected; };
   const boost::asio::ip::tcp::endpoint& remoteEndPoint() const {return m_RemoteEndPoint; };

   // methods
   void connect(boost::asio::ip::tcp::resolver_iterator _rit);
   void connect(const String& _host, const Port _port);
   void close();

   // Events
   Event onRead;
   Event onResolve;
   Event onConnect;
   Event onClose;
  }; // eo class Socket

А теперь реализация. Вы заметите, что он вызывает другой класс для выполнения разрешения DNS. Я покажу это позже. Также есть некоторые EventArg -производные, которые я пропустил. Они просто передаются как параметры EventArg, когда происходят события сокета.

socket.cpp

#include "socket.h"


// boost
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  // ctor
  Socket::Socket(ByteVector_sz _bufSize /* = 512 */) : m_bConnected(false)
  {
   m_Buffer.resize(_bufSize);
  }; // eo ctor

  // dtor
  Socket::~Socket()
  {
  }; // eo dtor


  // _handleRead
  void Socket::_handleRead(const boost::system::error_code& _errorCode,
            std::size_t _read)
  {
   if(!_errorCode)
   {
    if(_read)
    {
     onRead.raise(SocketReadEventArgs(*this, m_Buffer, _read));
     // read again
     m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
    };
   }
   else
    close();
  }; // eo _handleRead


  // _handleConnect
  void Socket::_handleConnect(const boost::system::error_code& _errorCode,
         boost::asio::ip::tcp::resolver_iterator _rit)
  {
   m_bConnected = !_errorCode;
   bool _raise(false);
   if(!_errorCode)
   {
    m_RemoteEndPoint = *_rit;
    _raise = true;
    m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
   }
   else if(++_rit != boost::asio::ip::tcp::resolver::iterator())
   {
    m_SocketPtr->close();
    m_SocketPtr->async_connect(*_rit, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
   }
   else
    _raise = true; // raise complete failure

   if(_raise)
    onConnect.raise(SocketConnectEventArgs(*this, _errorCode));

  }; // eo _handleConnect


  // connect
  void Socket::connect(boost::asio::ip::tcp::resolver_iterator _rit)
  {
   boost::asio::ip::tcp::endpoint ep(*_rit);
   m_SocketPtr.reset(new boost::asio::ip::tcp::socket(Root::instance().ioService()));
   m_SocketPtr->async_connect(ep, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
  };


  void Socket::connect(const String& _host, Port _port)
  {
   // Anon function for resolution of the host-name and asynchronous calling of the above
   auto anonResolve = [this](const boost::system::error_code& _errorCode, 
           boost::asio::ip::tcp::resolver_iterator _epIt)
   {
    // raise event
    onResolve.raise(SocketResolveEventArgs(*this, !_errorCode ? (*_epIt).host_name() : String(""), _errorCode));

    // perform connect, calling back to anonymous function
    if(!_errorCode)
     this->connect(_epIt);
   };

   // Resolve the host calling back to anonymous function
   Root::instance().resolveHost(_host, _port, anonResolve);

  }; // eo connect


  void Socket::close()
  {
   if(m_bConnected)
   {
    onClose.raise(SocketCloseEventArgs(*this));
    m_SocketPtr->close();
    m_bConnected = false;
   };
  } // eo close

Как я уже говорил о разрешении DNS, строка Root::instance().resolveHost(_host, _port, anonResolve); вызывает это для выполнения асинхронного DNS:

  // resolve a host asynchronously
  template<typename ResolveHandler>
  void resolveHost(const String& _host, Port _port, ResolveHandler _handler)
  {
   boost::asio::ip::tcp::endpoint ret;
   boost::asio::ip::tcp::resolver::query query(_host, boost::lexical_cast<std::string>(_port));
   m_Resolver.async_resolve(query, _handler);
  }; // eo resolveHost

Наконец, мне нужен текстовый сокет, который вызывал событие при каждом получении строки (которая затем обрабатывается). На этот раз я опущу заголовочный файл и просто покажу файл реализации. Нет необходимости говорить, что он объявляет Event с именем onLine, который запускается каждый раз, когда строка принимается полностью:

// boost
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  String TextSocket::m_DefaultEOL("\r\n");

  // ctor
  TextSocket::TextSocket() : m_EOL(m_DefaultEOL)
  {
   onRead += boost::bind(&TextSocket::readHandler, this, _1);
  }; // eo ctor


  // dtor
  TextSocket::~TextSocket()
  {
  }; // eo dtor


  // readHandler
  void TextSocket::readHandler(const EventArgs& _args)
  {
   auto& args(static_cast<const SocketReadEventArgs&>(_args));
   m_LineBuffer.append(args.buffer().begin(), args.buffer().begin() + args.bytesRead());
   String::size_type pos;
   while((pos = m_LineBuffer.find(eol())) != String::npos)
   {
    onLine.raise(SocketLineEventArgs(*this, m_LineBuffer.substr(0, pos)));
    m_LineBuffer = m_LineBuffer.substr(pos + eol().length());
   };
  }; // eo readHandler


  // writeHandler
  void TextSocket::writeHandler(const boost::system::error_code& _errorCode, std::size_t _written)
  {
   if(!_errorCode)
   {
    m_Queue.pop_front();
    if(!m_Queue.empty()) // more to do?
     boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
   }
   else
    close();
  }; // eo writeHandler

  void TextSocket::sendLine(String _line)
  {
   Root::instance().ioService().post(boost::bind(&TextSocket::_sendLine, this, _line));
  }; // eo sendLine


  // _sendLine
  void TextSocket::_sendLine(String _line)
  {
   // copy'n'queue
   _line.append(m_EOL);
   m_Queue.push_back(_line);
   if(m_Queue.size() == 1) // previously an empty queue, must start write!
    boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
  }; // eo sendLine

Несколько замечаний по поводу класса выше ... он использует boost::asio::post для отправки строк. Это позволяет всему происходить в потоках, которыми ASIO управляет потокобезопасным способом, и позволяет нам ставить в очередь строки, которые будут отправлены как и когда. Это делает его очень масштабируемым.

Я уверен, что есть еще много вопросов, и, возможно, мой код не поможет. Я потратил несколько дней на то, чтобы сложить все воедино и разобраться в этом, и я сомневаюсь, что это действительно хорошо. надеюсь, что некоторые лучшие умы будут смотреть на это и идти "СВЯТОЙ ХЛОПОГ, ЭТО

...