Я пытаюсь сделать пример сериализации в boost, чтобы иметь возможность выполнять несколько вызовов io_context.run()
с потоками. Тем не менее, я не могу оставить один connection::async_read
для одновременного запуска. Важная часть кода выглядит следующим образом:
/// Asynchronously read a data structure from the socket.
template <typename T, typename Handler>
void async_read(T& t, Handler handler)
{
// Issue a read operation to read exactly the number of bytes in a header.
void (connection::*f)(
const boost::system::error_code&,
T&, boost::tuple<Handler>)
= &connection::handle_read_header<T, Handler>;
boost::asio::async_read(socket_, boost::asio::buffer(inbound_header_),
boost::bind(f,
this, boost::asio::placeholders::error, boost::ref(t),
boost::make_tuple(handler)));
}
/// Handle a completed read of a message header. The handler is passed using
/// a tuple since boost::bind seems to have trouble binding a function object
/// created using boost::bind as a parameter.
template <typename T, typename Handler>
void handle_read_header(const boost::system::error_code& e,
T& t, boost::tuple<Handler> handler)
{
...
void (connection::*f)(
const boost::system::error_code&,
T&, boost::tuple<Handler>)
= &connection::handle_read_data<T, Handler>;
boost::asio::async_read(socket_, boost::asio::buffer(inbound_data_),
boost::bind(f, this,
boost::asio::placeholders::error, boost::ref(t), handler));
}
/// Handle a completed read of message data.
template <typename T, typename Handler>
void handle_read_data(const boost::system::error_code& e,
T& t, boost::tuple<Handler> handler)
{
...
std::string archive_data(&inbound_data_[0], inbound_data_.size());
std::istringstream archive_stream(archive_data);
boost::archive::text_iarchive archive(archive_stream);
archive >> t;
...
// Inform caller that data has been received ok.
boost::get<0>(handler)(e);
}
Он выполняет цепочку команд async_read, чтобы сначала получить размер данных с заголовком, а затем сами данные. Если я использую strand для каждого async_read, он обеспечивает только синхронизацию отдельных вызовов async_read.
Я читаю следующие сообщения: 1 , 2 , 3 . Однако, так как мой случай работает на многопоточность, я не смог расширить эти решения. Также я наткнулся на этот на сайте Boost, но он находится под зверем не Asio.
Обновление
По ссылке , я написал приведенный ниже код, но та же проблема сохраняется:
using namespace std;
using boost::asio::ip::tcp;
namespace ba = boost::asio;
struct async_read_initiation
{
template <typename T, typename CompletionHandler>
void operator()(CompletionHandler &&completion_handler, tcp::socket &socket,
T &t) const
{
struct intermediate_completion_handler
{
tcp::socket &socket_;
T &t_;
unique_ptr<vector<char>> inbound_header_;
vector<char> inbound_data_;
enum
{
starting,
head_reading,
data_reading
} state_;
typename std::decay<CompletionHandler>::type handler_;
void operator()(const boost::system::error_code &error, size_t)
{
if (!error)
{
switch (state_)
{
case starting:
case head_reading:
{
// Determine the length of the serialized data.
std::istringstream is(string(inbound_header_->begin(), inbound_header_->end()));
std::size_t inbound_data_size = 0;
if (!(is >> std::hex >> inbound_data_size))
{
cout << "(handle_read_header)Invalid header..." << endl;
// Header doesn't seem to be valid. Inform the caller.
boost::system::error_code error(ba::error::invalid_argument);
handler_(error);
return;
}
cout << "(handle_read_header)Header is valid-size: ";
cout << inbound_data_size << endl;
// Start an asynchronous call to receive the data.
inbound_data_.resize(inbound_data_size);
// Read actual data synchronously
state_ = data_reading;
ba::async_read(socket_, ba::buffer(inbound_data_), std::move(*this));
return;
}
case data_reading:
try
{
std::string archive_data(&inbound_data_[0], inbound_data_.size());
cout << "Size: " << inbound_data_.size() << endl;
std::istringstream archive_stream(archive_data);
boost::archive::text_iarchive archive(archive_stream);
archive >> t_;
cout << "Read data successfully" << endl;
}
catch (std::exception &e)
{
cout << "(async_read)Something is wrong.." << endl;
// Unable to decode data.
boost::system::error_code error(ba::error::invalid_argument);
handler_(error);
return;
}
break;
// Composed operation complete, continue below.
}
}
// Call the user-supplied handler with the result of the operation.
handler_(error);
}
using executor_type = boost::asio::associated_executor_t<
typename std::decay<CompletionHandler>::type,
tcp::socket::executor_type>;
executor_type get_executor() const noexcept
{
return boost::asio::get_associated_executor(
handler_, socket_.get_executor());
}
using allocator_type = boost::asio::associated_allocator_t<
typename std::decay<CompletionHandler>::type,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
return boost::asio::get_associated_allocator(
handler_, std::allocator<void>{});
}
};
unique_ptr<vector<char>> inbound_header(new vector<char>(HEADER_LENGTH));
auto encoded_header_buffer = ba::buffer(*inbound_header);
ba::async_read(
socket,
encoded_header_buffer,
intermediate_completion_handler{
socket,
t,
std::move(inbound_header),
vector<char>(),
intermediate_completion_handler::starting,
std::forward<CompletionHandler>(completion_handler)});
}
};
template <typename T, typename CompletionToken>
auto async_read_structure(tcp::socket &socket,
T &t,
CompletionToken &&token)
-> typename boost::asio::async_result<
typename std::decay<CompletionToken>::type,
void(boost::system::error_code)>::return_type
{
return ba::async_initiate<
CompletionToken, void(boost::system::error_code)>(
async_read_initiation(), token, std::ref(socket),
t);
}
Мой вопрос - как я могу расширить код сериализации, чтобы каждый connection::async_read
выполнялся без прерывания