Моя конечная цель - выполнять много параллельных HTTP-запросов асинхронным способом. Услышав о boost:asio
и boot::beast
, я пытаюсь понять принципы и способы правильного использования этих структур.
Я использую Visual Studio 2019.
Каждый запрос http должен быть запрошен из данного потока, но фактически выполняется из пула (скажем, четырех) рабочих потоков.
Чтобы протестировать такой сценарий, я сделал следующее:
// The io_context for all I/O;
boost::shared_ptr< boost::asio::io_context > io_context_p(new boost::asio::io_context);
// create thread pool where requests are executed
boost::thread_group worker_threads;
for (int x = 0; x < 1; ++x)
{
worker_threads.create_thread(boost::bind(&WorkerThread, io_context_p));
}
// Launch the asynchronous operation
std::make_shared<session>(*io_context_p)->run(host, port, target, version);
...
...
do other things
posted requests shall be executed in separate worker threads
...
Это полный код, в основном взятый из примера: https://www.boost.org/doc/libs/develop/libs/beast/example/http/client/async/http_client_async.cpp
Я просто поместил io_context->run()
в мой выделенный рабочий поток.
Пока это работает, но я думаю, что это не так, как должно быть, потому что я смешиваю boost::asio
с boost:beast
.
#include <iostream>
#include <sstream>
#include <boost/algorithm/hex.hpp>
#include <boost/variant.hpp>
#include <boost/optional/optional.hpp>
#include <boost/asio.hpp>
#ifdef _MSC_VER
#include <boost/config/compiler/visualc.hpp>
#endif
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/foreach.hpp>
#include <cassert>
#include <exception>
#include <iostream>
#include <sstream>
#include <string>
using boost::property_tree::ptree;
#include <iostream>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
//------------------------------------------------------------------------------
//
// Example: HTTP client, asynchronous
//
//------------------------------------------------------------------------------
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
namespace http = boost::beast::http; // from <boost/beast/http.hpp>
boost::mutex global_stream_lock;
void WorkerThread(boost::shared_ptr< boost::asio::io_context > io_context)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Start" << std::endl;
global_stream_lock.unlock();
io_context->run();
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Finish" << std::endl;
global_stream_lock.unlock();
}
//------------------------------------------------------------------------------
// Report a failure
void
fail(boost::system::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Performs an HTTP GET and prints the response
class session : public std::enable_shared_from_this<session>
{
tcp::resolver resolver_;
tcp::socket socket_;
boost::beast::flat_buffer buffer_; // (Must persist between reads)
http::request<http::empty_body> req_;
http::response<http::string_body> res_;
public:
// Resolver and socket require an io_context
explicit
session(boost::asio::io_context& ioc)
: resolver_(ioc)
, socket_(ioc)
{
}
// Start the asynchronous operation
void run(
char const* host,
char const* port,
char const* target,
int version)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id() << "] run " << std::endl;
global_stream_lock.unlock();
// Set up an HTTP GET request message
req_.version(version);
req_.method(http::verb::get);
req_.target(target);
req_.set(http::field::host, host);
req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
// Look up the domain name
resolver_.async_resolve(
host,
port,
std::bind(
&session::on_resolve,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}
void on_resolve(
boost::system::error_code ec,
tcp::resolver::results_type results)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id() << "] resolve " << std::endl;
global_stream_lock.unlock();
if (ec)
return fail(ec, "resolve");
// Make the connection on the IP address we get from a lookup
boost::asio::async_connect(
socket_,
results.begin(),
results.end(),
std::bind(
&session::on_connect,
shared_from_this(),
std::placeholders::_1));
}
void on_connect(boost::system::error_code ec)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id() << "] connect " << std::endl;
global_stream_lock.unlock();
if (ec)
return fail(ec, "connect");
// Send the HTTP request to the remote host
http::async_write(socket_, req_,
std::bind(
&session::on_write,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}
void on_write(
boost::system::error_code ec,
std::size_t bytes_transferred)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id() << "] on_write " << std::endl;
global_stream_lock.unlock();
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
// Receive the HTTP response
http::async_read(socket_, buffer_, res_,
std::bind(
&session::on_read,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}
void on_read(
boost::system::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id() << "] on_read " << std::endl;
global_stream_lock.unlock();
if (ec)
return fail(ec, "read");
// Write the message to standard out
std::cout << res_ << std::endl;
// Gracefully close the socket
socket_.shutdown(tcp::socket::shutdown_both, ec);
// not_connected happens sometimes so don't bother reporting it.
if (ec && ec != boost::system::errc::not_connected)
return fail(ec, "shutdown");
// If we get here then the connection is closed gracefully
}
};
//------------------------------------------------------------------------------
int main(int argc, char** argv)
{
auto const host = "www.example.com";
auto const port = "80";
auto const target = "/";
int version = 10;
// start service in thread
boost::shared_ptr< boost::asio::io_service > io_service(
new boost::asio::io_service
);
boost::shared_ptr< boost::asio::io_service::work > work(
new boost::asio::io_service::work(*io_service)
);
boost::shared_ptr< boost::asio::io_service::strand > strand(
new boost::asio::io_service::strand(*io_service)
);
// The io_context is required for all I/O;
boost::shared_ptr< boost::asio::io_context > io_context_p(new boost::asio::io_context);
boost::thread_group worker_threads;
for (int x = 0; x < 3; ++x)
{
worker_threads.create_thread(boost::bind(&WorkerThread, io_context_p));
}
// Launch the asynchronous operation
std::make_shared<session>(*io_context_p)->run(host, port, target, version);
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Press [return] to exit." << std::endl;
global_stream_lock.unlock();
std::cin.get();
worker_threads.join_all();
return EXIT_SUCCESS;
}
Вывод:
[535c] run
[53c8] Thread Start
[95c] Thread Start
[535c] Press [return] to exit.
[4cc0] Thread Start
[95c] resolve
[95c] connect
[53c8] on_write
[53c8] on_read
HTTP/1.0 200 OK
Age: 426414
Cache-Control: max-age=604800
Content-Type: text/html; charset=UTF-8
Date: Tue, 21 Jan 2020 10:14:19 GMT
Etag: "3147526947+ident"
Expires: Tue, 28 Jan 2020 10:14:19 GMT
Last-Modified: Thu, 17 Oct 2019 07:18:26 GMT
Server: ECS (dcb/7F5E)
Vary: Accept-Encoding
X-Cache: HIT
Content-Length: 1256
Connection: close
<!doctype html>
<html>
<head>
<title>Example Domain</title>
<meta charset="utf-8" />
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<style type="text/css">
body {
background-color: #f0f0f2;
margin: 0;
padding: 0;
font-family: -apple-system, system-ui, BlinkMacSystemFont, "Segoe UI", "Open Sans", "Helvetica Neue", Helvetica, Arial, sans-serif;
}
div {
width: 600px;
margin: 5em auto;
padding: 2em;
background-color: #fdfdff;
border-radius: 0.5em;
box-shadow: 2px 3px 7px 2px rgba(0,0,0,0.02);
}
a:link, a:visited {
color: #38488f;
text-decoration: none;
}
@media (max-width: 700px) {
div {
margin: 0 auto;
width: auto;
}
}
</style>
</head>
<body>
<div>
<h1>Example Domain</h1>
<p>This domain is for use in illustrative examples in documents. You may use this
domain in literature without prior coordination or asking for permission.</p>
<p><a href="https://www.iana.org/domains/example">More information...</a></p>
</div>
</body>
</html>
[53c8] Thread Finish
[95c] Thread Finish
[4cc0] Thread Finish
Похоже, что асинхронные вызовы функций действительно обрабатываются разными рабочими потоками
[535c] run
[53c8] Thread Start
[95c] Thread Start
[4cc0] Thread Start
[95c] resolve
[95c] connect
[53c8] on_write
[53c8] on_read
Однако, когда все закончится, io_context->run()
возвращается. Я хотел бы иметь возможность "публиковать" новые запросы на io_context
без повторного запуска. В boost:asio
io_servive.run
не вернется, когда он не будет работать, но вместо этого ждет публикации новой работы.
Так что я думаю, что я не правильно использую фреймворк - пожалуйста, дайте мне знать, как я буду делать это правильно.
РЕДАКТИРОВАТЬ:
Я обнаружил проблему: я забыл использовать рабочий объект:
boost::shared_ptr< boost::asio::io_context > io_context_p(new
boost::asio::io_context);
boost::asio::io_context::work work(*io_context_p);
https://www.boost.org/doc/libs/1_57_0/doc/html/boost_asio/reference/io_service__work.html: Это гарантирует, что функция run () объекта io_service не будет завершена во время выполнения работы, и что она завершится, когда не осталось незавершенной работы.