КАК: Менеджер клиентских подключений для Boost :: asio? - PullRequest
2 голосов
/ 13 августа 2010

Я создал сервер, используя boost: asio. Когда клиент подключается, он отправляет file_size, file_name и file_data. Сервер сохраняет это в файле на диске. Это работает отлично! Хотя сейчас я запускаю и клиентское приложение, и серверное приложение в главном потоке их приложения (поэтому у меня есть серверное и клиентское приложение), которое блокирует выполнение остальных приложений.

Итак, абстрактно я хочу создать что-то вроде этого:

серверное приложение

  • есть один поток, чтобы получать и обрабатывать все входящие передачи файлов
  • есть другой поток, в котором остальная часть приложения может делать то, что хочет

клиентское приложение

  • когда я нажимаю клавишу пробела или когда я захочу, я хочу отправить файл на сервер в отдельном потоке от основного, чтобы мое приложение могло продолжать делать другие вещи, которые ему нужно сделать.

Мой вопрос: как мне создать менеджер для передачи файлов моего клиента?

Сервер передачи файлов принимает новые клиентские подключения для передачи файлов

#include "ofxFileTransferServer.h"

ofxFileTransferServer::ofxFileTransferServer(unsigned short nPort)
    :acceptor(
        io_service
        ,boost::asio::ip::tcp::endpoint(
            boost::asio::ip::tcp::v4()
            ,nPort
        )
        ,true
    )
    ,port(nPort)
{
}

// test
void ofxFileTransferServer::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferServer::accept
        ,this
    ));
}


void ofxFileTransferServer::accept() {
    ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
    acceptor.async_accept(
                    new_connection->socket()
                    ,boost::bind(
                        &ofxFileTransferServer::handleAccept
                        ,this
                        ,new_connection
                        ,boost::asio::placeholders::error
                    )
    );
    std::cout << __FUNCTION__ << " start accepting " << std::endl;
    io_service.run();
}


void ofxFileTransferServer::handleAccept(
            ofxFileTransferConnection::pointer pConnection
            ,const boost::system::error_code& rErr
)
{
    std::cout << __FUNCTION__ << " " << rErr << ", " << rErr.message() << std::endl;
    if(!rErr) {
        pConnection->start();
        ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
        acceptor.async_accept(
                        new_connection->socket()
                        ,boost::bind(
                            &ofxFileTransferServer::handleAccept
                            ,this
                            ,new_connection
                            ,boost::asio::placeholders::error
                        )
        );


    }
}

Клиент передачи файлов

#include "ofxFileTransferClient.h"
#include "ofMain.h"

using boost::asio::ip::tcp;

ofxFileTransferClient::ofxFileTransferClient(
                    boost::asio::io_service &rIOService
                    ,const std::string sServer
                    ,const std::string nPort
                    ,const std::string sFilePath  
):resolver_(rIOService)
,socket_(rIOService)
,file_path_(sFilePath)
,server_(sServer)
,port_(nPort)
{
}

ofxFileTransferClient::~ofxFileTransferClient() {
    std::cout << "~~~~ ofxFileTransferClient" << std::endl;
}

void ofxFileTransferClient::start() {
    // open file / get size
    source_file_stream_.open(
                    ofToDataPath(file_path_).c_str()
                    ,std::ios_base::binary | std::ios_base::ate
    );
    if(!source_file_stream_) {
        std::cout << ">> failed to open:" << file_path_ << std::endl;
        return;
    }

    size_t file_size = source_file_stream_.tellg();
    source_file_stream_.seekg(0);

    // send file size and name to server.
    std::ostream request_stream(&request_);

    request_stream  << file_path_ << "\n"
                    << file_size << "\n\n";

    std::cout   << ">> request_size:"   << request_.size() 
                << " file_path: " << file_path_
                << " file_size: "<< file_size
                << std::endl;

    // resolve ofxFileTransferServer
    tcp::resolver::query query(server_, port_);
    resolver_.async_resolve(
                query
                ,boost::bind(
                        &ofxFileTransferClient::handleResolve
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                        ,boost::asio::placeholders::iterator
                )
    );

}


void ofxFileTransferClient::handleResolve(
                const boost::system::error_code& rErr
                ,tcp::resolver::iterator oEndPointIt
)
{
    if(!rErr) {
        tcp::endpoint endpoint = *oEndPointIt;
        socket_.async_connect(
                endpoint
                ,boost::bind(
                        &ofxFileTransferClient::handleConnect
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                        ,++oEndPointIt
                )
        );
    }
    else {
        std::cout << ">> error: " << rErr.message() << std::endl;
    }

}   

void ofxFileTransferClient::handleConnect(
                const boost::system::error_code& rErr
                ,tcp::resolver::iterator oEndPointIt
)
{
    if(!rErr) {
        cout << ">> connected!" << std::endl;
        boost::asio::async_write(
                 socket_
                ,request_
                ,boost::bind(
                        &ofxFileTransferClient::handleFileWrite
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                )
        );
    }
    else if (oEndPointIt != tcp::resolver::iterator()) {
        // connection failed, try next endpoint in list
        socket_.close();
        tcp::endpoint endpoint = *oEndPointIt;
        socket_.async_connect(
            endpoint
            ,boost::bind(
                &ofxFileTransferClient::handleConnect
                ,shared_from_this()
                ,boost::asio::placeholders::error
                ,++oEndPointIt
            )
        );

    }
    else {
        std::cout << ">> error: " << rErr.message() << std::endl;
    }
}

void ofxFileTransferClient::handleFileWrite(
                const boost::system::error_code& rErr
)
{
    if(!rErr) {
        if(source_file_stream_.eof() == false) {
            source_file_stream_.read(buf_.c_array(), buf_.size());
            if(source_file_stream_.gcount() <= 0) {
                std::cout << ">> read file error." << std::endl;
                return;
            }
            std::cout << ">> send: " << source_file_stream_.gcount() << " bytes, total: " << source_file_stream_.tellg() << " bytes\n";
            boost::asio::async_write(
                    socket_
                    ,boost::asio::buffer(buf_.c_array(), source_file_stream_.gcount())
                    ,boost::bind(
                        &ofxFileTransferClient::handleFileWrite
                        ,this
                        ,boost::asio::placeholders::error
                    )
            );

            if(rErr) {
                std::cout <<">> send error: " << rErr << std::endl; // not sure bout this one..
            }

        }
        else {
            return; // eof()
        }
    }
    else {
        std::cout << ">> error:" << rErr.message() << std::endl;
    }
}

И крошечный менеджер для передачи клиентских менеджеров (который используется в клиентском приложении) Опять же, код потока предназначен только для тестирования и не используется.

#include "ofxFileTransferManager.h"

ofxFileTransferManager::ofxFileTransferManager() { 
}

void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
    ));
    client->start();
    io_service_.run();
}

void ofxFileTransferManager::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferManager::run
        ,this
    ));
}

void ofxFileTransferManager::run() {
    cout << "starting filemanager" << std::endl;
    while(true) {
        io_service_.run();
        boost::this_thread::sleep(boost::posix_time::milliseconds(250)); 
        cout << ".";

    }
    cout << "ready filemanager" << std::endl;
}

Было бы замечательно, если бы кто-нибудь мог помочь мне здесь. В примере boost все используют «одноразовое» клиентское соединение, которое на самом деле мне не помогает.

roxlu

Ответы [ 2 ]

4 голосов
/ 13 августа 2010

Отлично!Я просто понял это.Мне пришлось обернуть мой io_service вокруг объекта boost :: asio :: io_service :: work!(и забыл где-нибудь shared_from_this ()).Я загрузил свой код здесь: http://github.com/roxlu/ofxFileTransfer

Для удобства вот код менеджера:

#include "ofxFileTransferManager.h"



ofxFileTransferManager::ofxFileTransferManager()
:work_(io_service_)
{ 
}

void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
            ,const std::string sRemoteFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
        ,sRemoteFile
    ));
    client->start();
}

void ofxFileTransferManager::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferManager::run
        ,this
    ));
}

void ofxFileTransferManager::run() {
    io_service_.run();
}
1 голос
/ 13 августа 2010

Из того, что я могу сказать, все, что вам действительно нужно, - это создать новый поток и поместить его в основной цикл io_service.run();.

Очевидно, что вам придется позаботиться о защите классов и переменных в мьютексахкоторые разделены между основным потоком appss и потоком asio.

Редактировать: Что-то вроде этого?

static sem_t __semSendFile;

static void* asioThread(void*)
{
    while( true )
    {
        sem_wait( &__semSendFile );
        io_service.run();
    }
    return NULL;
}

void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
    ));
    client->start();
    sem_post( &__semSendFile );
}

int main(int argc, char **argv)
{
    if ( sem_init( &__semSendFile, 0, 0 ) != 0 )
    {
        std::cerr << strerror( errno ) << std::endl;
        return -1;
    }

    pthread_t thread;
    if ( pthread_create( &thread, NULL, asioThread, NULL ) != 0 )
    {
        std::cerr << strerror( errno ) << std::endl;
        return -1;
    }

 [...]
...