Это правильный способ, которым я должен использовать boost :: beast? - PullRequest
0 голосов
/ 21 января 2020

Моя конечная цель - выполнять много параллельных HTTP-запросов асинхронным способом. Услышав о boost:asio и boot::beast, я пытаюсь понять принципы и способы правильного использования этих структур.

Я использую Visual Studio 2019.

Каждый запрос http должен быть запрошен из данного потока, но фактически выполняется из пула (скажем, четырех) рабочих потоков.

Чтобы протестировать такой сценарий, я сделал следующее:

    // The io_context for all I/O;
    boost::shared_ptr< boost::asio::io_context > io_context_p(new boost::asio::io_context);

    // create thread pool where requests are executed
    boost::thread_group worker_threads;
    for (int x = 0; x < 1; ++x)
    {
        worker_threads.create_thread(boost::bind(&WorkerThread, io_context_p));
    }

    // Launch the asynchronous operation
    std::make_shared<session>(*io_context_p)->run(host, port, target, version);

    ...
    ...
    do other things 
    posted requests shall be executed in separate worker threads
    ...

Это полный код, в основном взятый из примера: https://www.boost.org/doc/libs/develop/libs/beast/example/http/client/async/http_client_async.cpp

Я просто поместил io_context->run() в мой выделенный рабочий поток.

Пока это работает, но я думаю, что это не так, как должно быть, потому что я смешиваю boost::asio с boost:beast.

#include <iostream>
#include <sstream>
#include <boost/algorithm/hex.hpp>
#include <boost/variant.hpp>
#include <boost/optional/optional.hpp>
#include <boost/asio.hpp>

#ifdef _MSC_VER
#include <boost/config/compiler/visualc.hpp>
#endif
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/foreach.hpp>
#include <cassert>
#include <exception>
#include <iostream>
#include <sstream>
#include <string>

using boost::property_tree::ptree;

#include <iostream>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>

//------------------------------------------------------------------------------
//
// Example: HTTP client, asynchronous
//
//------------------------------------------------------------------------------

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>

using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
namespace http = boost::beast::http;    // from <boost/beast/http.hpp>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr< boost::asio::io_context > io_context)
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    io_context->run();


    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}


//------------------------------------------------------------------------------

// Report a failure
void
fail(boost::system::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Performs an HTTP GET and prints the response
class session : public std::enable_shared_from_this<session>
{
    tcp::resolver resolver_;
    tcp::socket socket_;
    boost::beast::flat_buffer buffer_; // (Must persist between reads)
    http::request<http::empty_body> req_;
    http::response<http::string_body> res_;

public:
    // Resolver and socket require an io_context
    explicit
        session(boost::asio::io_context& ioc)
        : resolver_(ioc)
        , socket_(ioc)
    {
    }

    // Start the asynchronous operation
    void run(
            char const* host,
            char const* port,
            char const* target,
            int version)
    {
        global_stream_lock.lock();
        std::cout << "[" << boost::this_thread::get_id() << "] run " << std::endl;
        global_stream_lock.unlock();

        // Set up an HTTP GET request message
        req_.version(version);
        req_.method(http::verb::get);
        req_.target(target);
        req_.set(http::field::host, host);
        req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);

        // Look up the domain name
        resolver_.async_resolve(
            host,
            port,
            std::bind(
                &session::on_resolve,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
    }

    void on_resolve(
            boost::system::error_code ec,
            tcp::resolver::results_type results)
    {
        global_stream_lock.lock();
        std::cout << "[" << boost::this_thread::get_id() << "] resolve " << std::endl;
        global_stream_lock.unlock();

        if (ec)
            return fail(ec, "resolve");

        // Make the connection on the IP address we get from a lookup
        boost::asio::async_connect(
            socket_,
            results.begin(),
            results.end(),
            std::bind(
                &session::on_connect,
                shared_from_this(),
                std::placeholders::_1));
    }

    void on_connect(boost::system::error_code ec)
    {
        global_stream_lock.lock();
        std::cout << "[" << boost::this_thread::get_id() << "] connect " << std::endl;
        global_stream_lock.unlock();

        if (ec)
            return fail(ec, "connect");

        // Send the HTTP request to the remote host
        http::async_write(socket_, req_,
            std::bind(
                &session::on_write,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
    }

    void on_write(
            boost::system::error_code ec,
            std::size_t bytes_transferred)
    {
        global_stream_lock.lock();
        std::cout << "[" << boost::this_thread::get_id() << "] on_write " << std::endl;
        global_stream_lock.unlock();

        boost::ignore_unused(bytes_transferred);

        if (ec)
            return fail(ec, "write");

        // Receive the HTTP response
        http::async_read(socket_, buffer_, res_,
            std::bind(
                &session::on_read,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
    }

    void on_read(
            boost::system::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);
        global_stream_lock.lock();
        std::cout << "[" << boost::this_thread::get_id() << "] on_read " << std::endl;
        global_stream_lock.unlock();

        if (ec)
            return fail(ec, "read");

        // Write the message to standard out
        std::cout << res_ << std::endl;

        // Gracefully close the socket
        socket_.shutdown(tcp::socket::shutdown_both, ec);

        // not_connected happens sometimes so don't bother reporting it.
        if (ec && ec != boost::system::errc::not_connected)
            return fail(ec, "shutdown");

        // If we get here then the connection is closed gracefully
    }
};

//------------------------------------------------------------------------------

int main(int argc, char** argv)
{

    auto const host = "www.example.com";
    auto const port = "80";
    auto const target = "/";
    int version = 10;
    // start service in thread

    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
    );
    boost::shared_ptr< boost::asio::io_service::work > work(
        new boost::asio::io_service::work(*io_service)
    );
    boost::shared_ptr< boost::asio::io_service::strand > strand(
        new boost::asio::io_service::strand(*io_service)
    );


    // The io_context is required for all I/O;
    boost::shared_ptr< boost::asio::io_context > io_context_p(new boost::asio::io_context);

    boost::thread_group worker_threads;
    for (int x = 0; x < 3; ++x)
    {
        worker_threads.create_thread(boost::bind(&WorkerThread, io_context_p));
    }

    // Launch the asynchronous operation
    std::make_shared<session>(*io_context_p)->run(host, port, target, version);

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Press [return] to exit." << std::endl;
    global_stream_lock.unlock();
    std::cin.get();

    worker_threads.join_all();

    return EXIT_SUCCESS;
} 

Вывод:

[535c] run
[53c8] Thread Start
[95c] Thread Start
[535c] Press [return] to exit.
[4cc0] Thread Start
[95c] resolve
[95c] connect
[53c8] on_write
[53c8] on_read
HTTP/1.0 200 OK
Age: 426414
Cache-Control: max-age=604800
Content-Type: text/html; charset=UTF-8
Date: Tue, 21 Jan 2020 10:14:19 GMT
Etag: "3147526947+ident"
Expires: Tue, 28 Jan 2020 10:14:19 GMT
Last-Modified: Thu, 17 Oct 2019 07:18:26 GMT
Server: ECS (dcb/7F5E)
Vary: Accept-Encoding
X-Cache: HIT
Content-Length: 1256
Connection: close

<!doctype html>
<html>
<head>
    <title>Example Domain</title>

    <meta charset="utf-8" />
    <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <style type="text/css">
    body {
        background-color: #f0f0f2;
        margin: 0;
        padding: 0;
        font-family: -apple-system, system-ui, BlinkMacSystemFont, "Segoe UI", "Open Sans", "Helvetica Neue", Helvetica, Arial, sans-serif;

    }
    div {
        width: 600px;
        margin: 5em auto;
        padding: 2em;
        background-color: #fdfdff;
        border-radius: 0.5em;
        box-shadow: 2px 3px 7px 2px rgba(0,0,0,0.02);
    }
    a:link, a:visited {
        color: #38488f;
        text-decoration: none;
    }
    @media (max-width: 700px) {
        div {
            margin: 0 auto;
            width: auto;
        }
    }
    </style>
</head>

<body>
<div>
    <h1>Example Domain</h1>
    <p>This domain is for use in illustrative examples in documents. You may use this
    domain in literature without prior coordination or asking for permission.</p>
    <p><a href="https://www.iana.org/domains/example">More information...</a></p>
</div>
</body>
</html>

[53c8] Thread Finish
[95c] Thread Finish
[4cc0] Thread Finish

Похоже, что асинхронные вызовы функций действительно обрабатываются разными рабочими потоками

[535c] run
[53c8] Thread Start
[95c] Thread Start
[4cc0] Thread Start
[95c] resolve
[95c] connect
[53c8] on_write
[53c8] on_read

Однако, когда все закончится, io_context->run() возвращается. Я хотел бы иметь возможность "публиковать" новые запросы на io_context без повторного запуска. В boost:asio io_servive.run не вернется, когда он не будет работать, но вместо этого ждет публикации новой работы.

Так что я думаю, что я не правильно использую фреймворк - пожалуйста, дайте мне знать, как я буду делать это правильно.

РЕДАКТИРОВАТЬ:

Я обнаружил проблему: я забыл использовать рабочий объект:

boost::shared_ptr< boost::asio::io_context > io_context_p(new 
boost::asio::io_context);
boost::asio::io_context::work work(*io_context_p);

https://www.boost.org/doc/libs/1_57_0/doc/html/boost_asio/reference/io_service__work.html: Это гарантирует, что функция run () объекта io_service не будет завершена во время выполнения работы, и что она завершится, когда не осталось незавершенной работы.

...