Асинхронная обработка в C ++ - PullRequest
       32

Асинхронная обработка в C ++

5 голосов
/ 16 декабря 2011

Сервер, который будет работать вечно и обрабатывать запросы, нуждается в асинхронной части кода, которая будет выполнять некоторые запросы к базе данных и обновляться только при появлении новых изменений. Сервер должен работать вечно, и эта функция для повторного и повторного выполнения функции db должна выполняться асинхронно, чтобы не было помех для сервера из-за обновления один раз в каждые «х» минут.

Как лучше всего это можно обрабатывать асинхронно в C ++? Как настроить эту функцию для запуска на демоне, чтобы он вообще не блокировал сервер?

Ответы [ 3 ]

5 голосов
/ 16 декабря 2011

Я бы настоятельно рекомендовал использовать Boost's ASIO-библиотеку

. Вам потребуется класс для приема новых запросов, а другой - для периодической проверки обновлений.Оба могут выполнять свою работу асинхронно и использовать один и тот же boost :: asio :: io_service для планирования работы.

Установкой будет

  • Асинхронная сеть boost::asio::ip::tcp::acceptor прослушивает новые запросы.
  • A boost::asio::deadline_time выполняет асинхронное ожидание и проверяет наличие обновлений длябаза данных.

Ниже приведен псевдокод, который, как я понимаю, вы описываете:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <string>

class DatabaseUpdateChecker{
    public:
    DatabaseUpdateChecker(boost::asio::io_service& io, const int& sleepTimeSeconds)
    :timer_(io,boost::posix_time::seconds(sleepTimeSeconds)),sleepSeconds_(sleepTimeSeconds){
        this->timer_.async_wait(boost::bind(&DatabaseUpdateChecker::doDBUpdateCheck,this,boost::asio::placeholders::error));
    };

    protected:
    void doDBUpdateCheck(const boost::system::error_code& error){
        if(!error){
            std::cout << " Checking Database for updates" << std::endl;
            //Reschdule ourself
            this->timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(this->sleepSeconds_));
            this->timer_.async_wait(boost::bind(&DatabaseUpdateChecker::doDBUpdateCheck,this,boost::asio::placeholders::error));
        }
    };
    private:
    boost::asio::deadline_timer timer_;
    int sleepSeconds_;  
};

typedef boost::shared_ptr<boost::asio::ip::tcp::socket> TcpSocketPtr;

class NetworkRequest{
    public: 
    NetworkRequest(boost::asio::io_service& io, const int& port)
    :acceptor_(io,boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),port)){
        this->start_accept();   
    };  
    protected:
    void start_accept(){
        TcpSocketPtr socketPtr(new boost::asio::ip::tcp::socket(acceptor_.get_io_service()));
        std::cout << "About to accept new connection" << std::endl;
        acceptor_.async_accept(*socketPtr,boost::bind(&NetworkRequest::handle_accept,this,socketPtr,boost::asio::placeholders::error));
    };  
    void handle_accept(TcpSocketPtr socketPtr,const boost::system::error_code& error){
        std::cout << "Accepted new network connection" << std::endl;
        if(!error){
            std::string response("This is a response\n");
            boost::asio::async_write(*socketPtr,boost::asio::buffer(response),
                boost::bind(&NetworkRequest::handle_write,this,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));
        }
        //Start listeing for a new connection
        this->start_accept();
    }   
    void handle_write(const boost::system::error_code& error,size_t size){
        if(!error){
            std::cout << "Wrote out " << size << " bytes to the network connection" << std::endl;
        }

    }   
    private:
    boost::asio::ip::tcp::acceptor acceptor_;
};

int main(int argc, char *argv[]) {
    static const int DB_TIMER_SECONDS=5;
    static const int LISTENING_TCP_PORT=4444;

    std::cout << "About to start" << std::endl;
    boost::asio::io_service io;

    DatabaseUpdateChecker dbChecker(io,DB_TIMER_SECONDS);
    NetworkRequest networkRequestAcceptor(io,LISTENING_TCP_PORT);

    io.run();

    std::cout << "This won't be printed" << std::endl;  
    return 0;
}

Компиляция и запуск вышеупомянутого кода покажет, что средство проверки обновлений базы данных будет проверять обновления каждый раз.5 секунд при прослушивании соединений через TCP-порт 4444. Чтобы увидеть код, примите новое соединение, вы можете использовать telnet / netcat / ваш любимый инструмент сетевого клиента ....

telnet 127.0.0.1 4444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
This is a response
Connection closed by foreign host.

Если вы обнаружите, что обработкаобновлений и / или запросов занимает значительное количество времени, тогда я бы посмотрел на многопоточность вашего приложения и выполнение каждой задачи в отдельном потоке.io_service будет планировать, какую работу он должен выполнить, и не завершит ее, пока не прекратится работа.Хитрость заключается в том, чтобы заставить классы выполнять работу, перепланировать себя, когда они закончат.

Конечно, вы должны принимать во внимание комментарии других по вашему вопросу.Я не знаю, как интерфейс CORBA может усложнить это, но я думаю, что boost :: asio как асинхронная библиотека C ++ будет хорошим решением и достаточно гибким для того, что вы описываете.

1 голос
/ 16 декабря 2011

Таким образом, в принципе (если я вас правильно понимаю) вам нужно периодически опрашивать базу данных, чтобы узнать, изменилась ли она.А когда он изменился, вам нужно уведомить обработчик запросов на основе CORBA о том, что произошли изменения.

Что бы я сделал, это добавил новый тип запроса на ваш сервер CORBA.Запрос будет «База данных обновлена».Затем вы можете написать небольшую совершенно другую программу, единственной задачей которой является опрос базы данных и отправка запроса CORBA после обновления базы данных.

Таким образом, вы можете сложить сообщение об обновлении базы данных в основной запрос.поток для сервера CORBA.

Нет потоков, нет ничего асинхронного.Всего два процесса, каждый из которых делает свое дело.

1 голос
/ 16 декабря 2011

Похоже, это означает, что, хотя система непрерывно обрабатывает сетевые запросы, она асинхронно связывается с БД.

Это означает, что когда ему нужно поговорить с БД, он отправляет запрос, но не ждет ответа.

Когда он получает ответ от БД, он обрабатывает его.

Асинхронная часть может быть реализована с помощью отдельного потока, который общается с БД, и когда он получает ответ, он отправляет четные данные в очередь сервера для обработки.

Или сервер может прослушивать множество сокетов для данных, и одним из них может быть соединение с базой данных, где он получает ответы от БД.

...