Периодически данные не доставляются через порт завершения boost :: asio / io - PullRequest
6 голосов
/ 10 февраля 2011

Проблема

Я использую boost :: asio для проекта, в котором два процесса на одном компьютере взаимодействуют по протоколу TCP / IP.Один генерирует данные для чтения другим, но я сталкиваюсь с проблемой, когда данные периодически не передаются через соединение.Я свел это к очень простому примеру ниже, основанному на примере async tcp echo server .

Процессы (исходный код ниже) запускаются нормально, быстро доставляя данныеставка от отправителя к получателю.Затем, внезапно, никакие данные не доставляются в течение примерно пяти секунд.Затем данные доставляются снова до следующей необъяснимой паузы.В течение этих пяти секунд процессы поглощают 0% ЦП, и никакие другие процессы, кажется, не делают ничего особенного.Пауза всегда одинаковой длины - пять секунд.

Я пытаюсь выяснить, как избавиться от этих срывов и что их вызывает.

Использование ЦП во время всего цикла:

CPU usage during a single run

Notice how there are three dips of CPU usage in the middle of the run - a "run" is a single invocation of the server process and the client process. During these dips, no data was delivered. The number of dips and their timing differs between runs - some times no dips at all, some times many.

I am able to affect the "probability" of these stalls by changing the size of the read buffer - for instance if I make the read buffer a multiple of the send chunk size it appears that this problem almost goes away, but not entirely.

Source and test description

I've compiled the below code with Visual Studio 2005, using Boost 1.43 and Boost 1.45. I have tested on Windows Vista 64 bit (on a quad-core) and Windows 7 64 bit (on both a quad-core and a dual-core).

The server accepts a connection and then simply reads and discards data. Whenever a read is performed a new read is issued.

The client connects to the server, then puts a bunch of packets into a send queue. After this it writes the packets one at the time. Whenever a write has completed, the next packet in the queue is written. A separate thread monitors the queue size and prints this to stdout every second. During the io stalls, the queue size remains exactly the same.

I have tried to used scatter io (writing multiple packets in one system call), but the result is the same. If I disable IO completion ports in Boost using BOOST_ASIO_DISABLE_IOCP, the problem appears to go away but at the price of significantly lower throughput.

// Example is adapted from async_tcp_echo_server.cpp which is
// Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Start program with -s to start as the server
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif                      

#include 
#include 
#include 
#include 
#include 

#define PORT "1234"
using namespace boost::asio::ip;
using namespace boost::system;

class session {
public:
    session(boost::asio::io_service& io_service) : socket_(io_service) {}

    void do_read() {
        socket_.async_read_some(boost::asio::buffer(data_, max_length),
            boost::bind(&session::handle_read, this, _1, _2));
    }

    boost::asio::ip::tcp::socket& socket() { return socket_; }
protected:
    void handle_read(const error_code& ec, size_t bytes_transferred) {
        if (!ec) {
            do_read();
        } else {
            delete this;
        }
    }

private:
    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class server {
public:
    explicit server(boost::asio::io_service& io_service)
        : io_service_(io_service)
        , acceptor_(io_service, tcp::endpoint(tcp::v4(), atoi(PORT)))
    {
        session* new_session = new session(io_service_);
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, _1));
    }

    void handle_accept(session* new_session, const error_code& ec) {
        if (!ec) {
            new_session->do_read();
            new_session = new session(io_service_);
            acceptor_.async_accept(new_session->socket(),
                boost::bind(&server::handle_accept, this, new_session, _1));
        } else {
            delete new_session;
        }
    }

private:
    boost::asio::io_service& io_service_;
    boost::asio::ip::tcp::acceptor acceptor_;
};

class client {
public:
    explicit client(boost::asio::io_service &io_service)
        : io_service_(io_service)
        , socket_(io_service)
        , work_(new boost::asio::io_service::work(io_service))
    {
        io_service_.post(boost::bind(&client::do_init, this));
    }

    ~client() {
        packet_thread_.join(); 
    }

protected:

    void do_init() {
        // Connect to the server
        tcp::resolver resolver(io_service_);
        tcp::resolver::query query(tcp::v4(), "localhost", PORT);
        tcp::resolver::iterator iterator = resolver.resolve(query);
        socket_.connect(*iterator);

        // Start packet generation thread
        packet_thread_.swap(boost::thread(
                boost::bind(&client::generate_packets, this, 8000, 5000000)));
    }

    typedef std::vector packet_type;
    typedef boost::shared_ptr packet_ptr;

    void generate_packets(long packet_size, long num_packets) {
        // Add a single dummy packet multiple times, then start writing
        packet_ptr buf(new packet_type(packet_size, 0));
        write_queue_.insert(write_queue_.end(), num_packets, buf);
        queue_size = num_packets;
        do_write_nolock();

        // Wait until all packets are sent.
        while (long queued = InterlockedExchangeAdd(&queue_size, 0)) {
            std::cout << "Queue size: " << queued << std::endl;
            Sleep(1000);
        }

        // Exit from run(), ignoring socket shutdown
        work_.reset();
    }

    void do_write_nolock() {
        const packet_ptr &p = write_queue_.front();
        async_write(socket_, boost::asio::buffer(&(*p)[0], p->size()),
            boost::bind(&client::on_write, this, _1));
    }

    void on_write(const error_code &ec) {
        if (ec) { throw system_error(ec); }

        write_queue_.pop_front();
        if (InterlockedDecrement(&queue_size)) {
            do_write_nolock();
        }
    }

private:
    boost::asio::io_service &io_service_;
    tcp::socket socket_;
    boost::shared_ptr work_;
    long queue_size;
    std::list write_queue_;
    boost::thread packet_thread_;
};

int _tmain(int argc, _TCHAR* argv[]) {
    try {
        boost::asio::io_service io_svc;
        bool is_server = argc > 1 && 0 == _tcsicmp(argv[1], _T("-s"));
        std::auto_ptr s(is_server ? new server(io_svc) : 0);
        std::auto_ptr c(is_server ? 0 : new client(io_svc));
        io_svc.run();
    } catch (std::exception& e) {
        std::cerr 

So my question is basically:

How do I get rid of these stalls?

What causes this to happen?

Update: There appears to be some correlation with disk activity contrary to what I stated above, so it appears that if I start a large directory copy on the disk while the test is running this might increase the frequency of the io stalls. This could indicate that this is the Приоритет ввода-вывода Windows , который включается?Поскольку паузы всегда имеют одинаковую длину, это звучит как тайм-аут где-то в коде ОС io ...

Ответы [ 2 ]

1 голос
/ 03 марта 2011

Недавно я столкнулся с очень похожей проблемой звучания, и у меня есть решение, которое работает для меня. У меня есть асинхронный сервер / клиент, написанный в asio, который отправляет и получает видео (и небольшие структуры запросов), и я видел частые 5-секундные задержки, как вы описали.

Наше исправление состояло в том, чтобы увеличить размер буферов сокетов на каждом конце и отключить алгоритм Nagle.

pSocket->set_option( boost::asio::ip::tcp::no_delay( true) );
pSocket->set_option( boost::asio::socket_base::send_buffer_size( s_SocketBufferSize ) );
pSocket->set_option( boost::asio::socket_base::receive_buffer_size( s_SocketBufferSize ) );

Возможно, что только один из перечисленных выше вариантов является критическим, но я не исследовал это дальше.

1 голос
/ 10 февраля 2011
  • настроить boost :: asio :: socket_base :: send_buffer_size и receive_buffer_size
  • настроить max_length для большего числа.Так как TCP ориентирован на поток, не думайте о нем как о получении отдельных пакетов.Скорее всего, это вызывает своего рода «тупик» между окнами отправки / получения TCP.
...