Как сделать сложные асинхронные вызовы, выполняемые без вмешательства, используя Strand в Boost Asio? - PullRequest
0 голосов
/ 19 марта 2020

Я пытаюсь сделать пример сериализации в boost, чтобы иметь возможность выполнять несколько вызовов io_context.run() с потоками. Тем не менее, я не могу оставить один connection::async_read для одновременного запуска. Важная часть кода выглядит следующим образом:

/// Asynchronously read a data structure from the socket.
template <typename T, typename Handler>
void async_read(T& t, Handler handler)
{
// Issue a read operation to read exactly the number of bytes in a header.
void (connection::*f)(
    const boost::system::error_code&,
    T&, boost::tuple<Handler>)
    = &connection::handle_read_header<T, Handler>;
boost::asio::async_read(socket_, boost::asio::buffer(inbound_header_),
    boost::bind(f,
        this, boost::asio::placeholders::error, boost::ref(t),
        boost::make_tuple(handler)));
}

/// Handle a completed read of a message header. The handler is passed using
/// a tuple since boost::bind seems to have trouble binding a function object
/// created using boost::bind as a parameter.
template <typename T, typename Handler>
void handle_read_header(const boost::system::error_code& e,
    T& t, boost::tuple<Handler> handler)
{
...
    void (connection::*f)(
        const boost::system::error_code&,
        T&, boost::tuple<Handler>)
    = &connection::handle_read_data<T, Handler>;
    boost::asio::async_read(socket_, boost::asio::buffer(inbound_data_),
    boost::bind(f, this,
        boost::asio::placeholders::error, boost::ref(t), handler));
}

/// Handle a completed read of message data.
template <typename T, typename Handler>
void handle_read_data(const boost::system::error_code& e,
    T& t, boost::tuple<Handler> handler)
{
...
    std::string archive_data(&inbound_data_[0], inbound_data_.size());
    std::istringstream archive_stream(archive_data);
    boost::archive::text_iarchive archive(archive_stream);
    archive >> t;
...
    // Inform caller that data has been received ok.
    boost::get<0>(handler)(e);

}

Он выполняет цепочку команд async_read, чтобы сначала получить размер данных с заголовком, а затем сами данные. Если я использую strand для каждого async_read, он обеспечивает только синхронизацию отдельных вызовов async_read.

Я читаю следующие сообщения: 1 , 2 , 3 . Однако, так как мой случай работает на многопоточность, я не смог расширить эти решения. Также я наткнулся на этот на сайте Boost, но он находится под зверем не Asio.


Обновление

По ссылке , я написал приведенный ниже код, но та же проблема сохраняется:

using namespace std;
using boost::asio::ip::tcp;
namespace ba = boost::asio;

struct async_read_initiation
{
    template <typename T, typename CompletionHandler>
    void operator()(CompletionHandler &&completion_handler, tcp::socket &socket,
                    T &t) const
    {
        struct intermediate_completion_handler
        {
            tcp::socket &socket_;

            T &t_;

            unique_ptr<vector<char>> inbound_header_;
            vector<char> inbound_data_;

            enum
            {
                starting,
                head_reading,
                data_reading
            } state_;

            typename std::decay<CompletionHandler>::type handler_;

            void operator()(const boost::system::error_code &error, size_t)
            {
                if (!error)
                {
                    switch (state_)
                    {
                    case starting:
                    case head_reading:
                    {
                        // Determine the length of the serialized data.
                        std::istringstream is(string(inbound_header_->begin(), inbound_header_->end()));
                        std::size_t inbound_data_size = 0;
                        if (!(is >> std::hex >> inbound_data_size))
                        {
                            cout << "(handle_read_header)Invalid header..." << endl;
                            // Header doesn't seem to be valid. Inform the caller.
                            boost::system::error_code error(ba::error::invalid_argument);
                            handler_(error);
                            return;
                        }
                        cout << "(handle_read_header)Header is valid-size: ";
                        cout << inbound_data_size << endl;

                        // Start an asynchronous call to receive the data.
                        inbound_data_.resize(inbound_data_size);

                        // Read actual data synchronously
                        state_ = data_reading;
                        ba::async_read(socket_, ba::buffer(inbound_data_), std::move(*this));
                        return;
                    }
                    case data_reading:
                        try
                        {
                            std::string archive_data(&inbound_data_[0], inbound_data_.size());
                            cout << "Size: " << inbound_data_.size() << endl;
                            std::istringstream archive_stream(archive_data);
                            boost::archive::text_iarchive archive(archive_stream);
                            archive >> t_;
                            cout << "Read data successfully" << endl;
                        }
                        catch (std::exception &e)
                        {
                            cout << "(async_read)Something is wrong.." << endl;
                            // Unable to decode data.
                            boost::system::error_code error(ba::error::invalid_argument);
                            handler_(error);
                            return;
                        }
                        break;
                        // Composed operation complete, continue below.
                    }
                }
                // Call the user-supplied handler with the result of the operation.
                handler_(error);
            }

            using executor_type = boost::asio::associated_executor_t<
                typename std::decay<CompletionHandler>::type,
                tcp::socket::executor_type>;

            executor_type get_executor() const noexcept
            {
                return boost::asio::get_associated_executor(
                    handler_, socket_.get_executor());
            }

            using allocator_type = boost::asio::associated_allocator_t<
                typename std::decay<CompletionHandler>::type,
                std::allocator<void>>;

            allocator_type get_allocator() const noexcept
            {
                return boost::asio::get_associated_allocator(
                    handler_, std::allocator<void>{});
            }
        };
        unique_ptr<vector<char>> inbound_header(new vector<char>(HEADER_LENGTH));

        auto encoded_header_buffer = ba::buffer(*inbound_header);

        ba::async_read(
            socket,
            encoded_header_buffer,
            intermediate_completion_handler{
                socket,
                t,
                std::move(inbound_header),
                vector<char>(),
                intermediate_completion_handler::starting,
                std::forward<CompletionHandler>(completion_handler)});
    }
};

template <typename T, typename CompletionToken>
auto async_read_structure(tcp::socket &socket,
                          T &t,
                          CompletionToken &&token)
    -> typename boost::asio::async_result<
        typename std::decay<CompletionToken>::type,
        void(boost::system::error_code)>::return_type
{
    return ba::async_initiate<
        CompletionToken, void(boost::system::error_code)>(
        async_read_initiation(), token, std::ref(socket),
        t);
}

Мой вопрос - как я могу расширить код сериализации, чтобы каждый connection::async_read выполнялся без прерывания

...