Boost beast :: функции обратного вызова websocket - PullRequest
0 голосов
/ 25 апреля 2018

Я экспериментирую с примером Boost beast :: websocket websocket_client_async.cpp в сочетании с websocket_server_async.cpp .

Как указано, пример client просто устанавливает соединение, отправляет строку на сервер (который просто возвращает ответ), печатает ответ, закрывает и существует.

Я пытаюсь изменить клиента, чтобы сохранить сеанс, чтобы я мог повторно отправлять / получать строки.Итак, в то время как функция on_handshake кода примера немедленно отправляет строку через ws_.async_write(...), я разделяю ее на свою собственную функцию write(...).

Вот мой модифицированный класс session:

using tcp = boost::asio::ip::tcp;
namespace websocket = boost::beast::websocket;

void fail(boost::system::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
    tcp::resolver resolver_;
    websocket::stream<tcp::socket> ws_;
    std::atomic<bool> io_in_progress_;
    boost::beast::multi_buffer buffer_;
    std::string host_;

public:
    // Resolver and socket require an io_context
    explicit session(boost::asio::io_context& ioc) : resolver_(ioc), ws_(ioc) {
        io_in_progress_ = false;
    }

    bool io_in_progress() const {
        return io_in_progress_;
    }

    // +---------------------+
    // | The "open" sequence |
    // +---------------------+
    void open(char const* host, char const* port)
    {
        host_ = host;

        // Look up the domain name
        resolver_.async_resolve(host, port,
            std::bind( &session::on_resolve, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_resolve(boost::system::error_code ec, tcp::resolver::results_type results)
    {
        if (ec)
            return fail(ec, "resolve");

        boost::asio::async_connect(
            ws_.next_layer(), results.begin(), results.end(),
            std::bind( &session::on_connect, shared_from_this(),
                std::placeholders::_1 )
        );
    }

    void on_connect(boost::system::error_code ec)
    {
        if (ec)
            return fail(ec, "connect");

        ws_.async_handshake(host_, "/",
            std::bind( &session::on_handshake, shared_from_this(),
                std::placeholders::_1 )
        );
    }

    void on_handshake(boost::system::error_code ec)
    {
        if (ec)
            return fail(ec, "handshake");
        else {
            std::cout << "Successful handshake with server.\n";
        }
    }

    // +---------------------------+
    // | The "write/read" sequence |
    // +---------------------------+
    void write(const std::string &text)
    {
        io_in_progress_ = true;
        ws_.async_write(boost::asio::buffer(text),
            std::bind( &session::on_write, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_write(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail(ec, "write");

        ws_.async_read(buffer_,
            std::bind( &session::on_read, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_read(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        io_in_progress_ = false; // end of write/read sequence
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail(ec, "read");

        std::cout << boost::beast::buffers(buffer_.data()) << std::endl;
    }

    // +----------------------+
    // | The "close" sequence |
    // +----------------------+
    void close()
    {
        io_in_progress_ = true;
        ws_.async_close(websocket::close_code::normal,
            std::bind( &session::on_close, shared_from_this(),
                std::placeholders::_1)
        );
    }

    void on_close(boost::system::error_code ec)
    {
        io_in_progress_ = false; // end of close sequence
        if (ec)
            return fail(ec, "close");

        std::cout << "Socket closed successfully.\n";
    }
};

Проблема в том, что, хотя соединение работает нормально, и я могу отправить строку, обратный вызов on_read никогда не срабатывает (если я не сделаю уродливый хак, описанный ниже).

Мой main выглядит следующим образом:

void wait_for_io(std::shared_ptr<session> psession, boost::asio::io_context &ioc)
{
    // Continually try to run the ioc until the callbacks are finally
    // triggered (as indicated by the session::io_in_progress_ flag)
    while (psession->io_in_progress()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        ioc.run();
    }
}

int main(int argc, char** argv)
{
    // Check command line arguments.
    if (argc != 3) {
        std::cerr << "usage info goes here...\n";
        return EXIT_FAILURE;
    }
    const char *host = argv[1], *port = argv[2];

    boost::asio::io_context ioc;
    std::shared_ptr<session> p = std::make_shared<session>(ioc);
    p->open(host, port);
    ioc.run(); // This works. Connection is established and all callbacks are executed.

    p->write("Hello world"); // String is sent & received by server,
                             // even before calling ioc.run()
                             // However, session::on_read callback is never
                             // reached.

    ioc.run();               // This seems to be ignored and returns immediately, so
    wait_for_io(p, ioc);     // <-- so this hack is necessary

    p->close();              // session::on_close is never reached
    ioc.run();               // Again, this seems to be ignored and returns immediately, so
    wait_for_io(p, ioc);     // <-- this is necessary

    return EXIT_SUCCESS;
}

Если я сделаю это:

p->write("Hello world");
while(1) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

Я могу подтвердить, что строка отправлена ​​и получена 1 сервером и что session::on_read обратный вызов не достигнут.

То же самое происходит с p->close().

Но, если я добавлюмоя странная wait_for_io() функция, все работает.Я уверен, что это ужасный взлом, но я не могу понять, что происходит.

1 Примечание. Я могу подтвердить, что сообщение действительно достигает сервера, так как я изменил пример сервера для печати всех полученных строк на консоли.Это была единственная модификация, которую я сделал.Функциональность эхо-клиента не изменилась.

Ответы [ 2 ]

0 голосов
/ 28 апреля 2018

Причина, по которой вызовы io_context::run() не работают после первого вызова (показано здесь):

boost::asio::io_context ioc;
std::shared_ptr<session> p = std::make_shared<session>(ioc);
p->open(host, port);
ioc.run(); // This works. Connection is established and all callbacks are executed.

, заключается в том, что функция io_context::restart() должна вызываться до любых последующих вызовов * 1006.*.

Из документации :

io_context :: restart

Перезапустите io_context при подготовке к последующему вызову run ().

Эта функция должна вызываться перед любым вторым или более поздним набором вызовов функций run (), run_one (), poll () или poll_one ()когда предыдущий вызов этих функций возвратился из-за остановки io_context или без работы.После вызова метода restart () функция stop () объекта io_context вернет false.

0 голосов
/ 27 апреля 2018

io_context::run вернется только тогда, когда больше нет ожидающих работ.Если вы просто убедитесь, что постоянно активен ожидающий вызов websocket::stream::async_read, то run никогда не вернется, и хаки не понадобятся.Также вы будете получать все сообщения, отправленные сервером.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...