Какой хороший способ справиться с многопоточностью с помощью Poco SocketReactor? - PullRequest
5 голосов
/ 03 декабря 2011

Итак, я начинаю исследовать альтернативы для реализации крупномасштабной системы клиент / сервер, и в настоящее время я смотрю на платформу Reactor Poco, поскольку сейчас я использую Poco для большей части своих платформ приложений.

Размеры входящего пакета будут довольно малы, поэтому я думаю, что он будет работать нормально с точки зрения чтения данных от клиентов.Но операции, которые будут выполняться на основе входных данных клиента, будут относительно дорогими и, возможно, их потребуется перенести в другой процесс или даже на другой сервер.И ответы, отправленные обратно клиенту, иногда будут довольно большими.Очевидно, что я не могу заблокировать поток реактора, пока он происходит.

Поэтому я думаю, что если я просто прочту данные в обработчике событий реактора, а затем передам их другому потоку (пулу), который обрабатываетс данными, это сработало бы лучше.

В чем я не слишком уверен, так это в процессе отправки ответов клиенту, когда операции завершены.

Я не могунайти слишком много информации о лучших способах использования фреймворка.Но я провел некоторое тестирование, и похоже, что реактор будет запускать событие WritableNotification несколько раз, пока сокет доступен для записи.Итак, будет ли оптимальным процесс ставить в очередь данные, которые должны быть отправлены в объекте, который получает события WritableNotification, и отправлять небольшие куски каждый раз, когда событие получено?

Обновление : такКогда я начал тестировать это, я с ужасом обнаружил, что загрузка ЦП сервера возросла до 100% на ЦП, на котором приложение сервера работало с одним подключением.Но после некоторого копания я обнаружил, что я делаю неправильно.Я обнаружил, что мне не нужно регистрироваться для событий WritableNotification при создании обработчика службы, мне нужно регистрироваться только при наличии данных для отправки.Затем, когда все данные отправлены, я должен отменить регистрацию обработчика событий.Таким образом, реактор не должен снова и снова вызывать обработчики событий, когда нечего отправлять.Теперь загрузка моего процессора остается близкой к 0 даже при 100 соединениях.Уф!

1 Ответ

4 голосов
/ 24 октября 2012

Я написал класс ServerConnector, который скопирован из SocketConnector, но не вызывал сокет connect для сокета, поскольку сокет уже был подключен, если реактор был запущен с ServiceHandler для уведомлений в функции run () функции TcpServerConnection Класс TcpServer будет начинать новый поток. Итак, я получил многопоточность реактора, но я не знаю, лучший ли это путь или нет.

класс ServerConnector

template <class ServiceHandler>
class ServerConnector
{
public:     
    explicit ServerConnector(StreamSocket& ss):
        _pReactor(0),
        _socket(ss)
        /// Creates a ServerConnector, using the given Socket.
    {
    }

    ServerConnector(StreamSocket& ss, SocketReactor& reactor):
        _pReactor(0),
        _socket(ss)
        /// Creates an acceptor, using the given ServerSocket.
        /// The ServerConnector registers itself with the given SocketReactor.
    {
        registerConnector(reactor);
        onConnect();
    }

    virtual ~ServerConnector()
        /// Destroys the ServerConnector.
    {
        unregisterConnector();
    }

//
// this part is same with SocketConnector
//

private:
    ServerConnector();
    ServerConnector(const ServerConnector&);
    ServerConnector& operator = (const ServerConnector&);

    StreamSocket&   _socket;
    SocketReactor* _pReactor;
};

Echo-Service - это обычный ServiceHander

class EchoServiceHandler
{
public:
    EchoServiceHandler(StreamSocket& socket, SocketReactor& reactor):
        _socket(socket),
        _reactor(reactor)
    {
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
    }

    ~EchoServiceHandler()
    {
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
    }

    void onReadable(ReadableNotification* pNf)
    {
        pNf->release();
        char buffer[4096];
        try {
            int n = _socket.receiveBytes(buffer, sizeof(buffer));
            if (n > 0)
            {
                _socket.sendBytes(buffer, n);
            } else
                onError();
        } catch( ... ) {
            onError();
        }
    }

    void onError(ErrorNotification* pNf)
    {
        pNf->release();
        onError();
    }

    void onError()
    {
        _socket.shutdown();
        _socket.close();
        _reactor.stop();
        delete this;
    }

private:
    StreamSocket   _socket;
    SocketReactor& _reactor;
};

EchoReactorConnection работает с классом TcpServer для запуска реактора в виде потока

class EchoReactorConnection: public TCPServerConnection
{
public:
    EchoReactorConnection(const StreamSocket& s): TCPServerConnection(s)
    {
    }

    void run()
    {
        StreamSocket& ss = socket();
        SocketReactor reactor;

        ServerConnector<EchoServiceHandler> sc(ss, reactor);
        reactor.run();
        std::cout << "exit EchoReactorConnection thread" << std::endl;
    }
};

Пример теста cppunit такой же, как и у TCPServerTest :: testMultiConnections, но с использованием EchoReactorConnection для многопоточности.

void TCPServerTest::testMultithreadReactor()
{
    ServerSocket svs(0);
    TCPServerParams* pParams = new TCPServerParams;
    pParams->setMaxThreads(4);
    pParams->setMaxQueued(4);
    pParams->setThreadIdleTime(100);

    TCPServer srv(new TCPServerConnectionFactoryImpl<EchoReactorConnection>(), svs, pParams);
    srv.start();

    assert (srv.currentConnections() == 0);
    assert (srv.currentThreads() == 0);
    assert (srv.queuedConnections() == 0);
    assert (srv.totalConnections() == 0);

    //
    // same with TCPServerTest::testMultiConnections()
    //
    // ....
    ///
}
...