Отменяет ли метод cancel () бустерного tcp акцептора операции с сокетами, работающими на разных io_context? - PullRequest
0 голосов
/ 16 марта 2020

У меня есть сервер веб-сокетов на boost :: beast, упрощенный код приведен ниже:

Server.h

class Server
{
public:
    using WorkerData = std::pair<std::size_t, boost::asio::ip::tcp::socket*>;
    using StartAcceptCallback = std::function<WorkerData()>;
    using AcceptCallback = std::function<void(tl::expected<std::size_t, std::string>)>;

    Server(boost::asio::io_context& ioContext, const std::string& host, unsigned short port)
        : ioContext_(ioContext), acceptor_(ioContext), isPendingShutdown_(false)
    {
        // Configure and start listening
    }

    void run()
    {
        accept();
    }

    void pendingOnShutdown()
    {
        isPendingShutdown_ = true;
    }

    void shutdown()
    {
        acceptor_.cancel();
    }

    void accept()
    {
        // This callback finds lowest loaded worker and returns an pointer to socket and worker index
        const WorkerData workerData = startAcceptCallback_();
        acceptor_.async_accept(*workerData.second,
                            boost::beast::bind_front_handler(&Server::onAccept, this, workerData.first));
    }

    void onAccept(std::size_t workerIndex, boost::beast::error_code err)
    {
        if(isPendingShutdown_)
        {
            return;
        }

        if(err)
        {
            if(err != boost::asio::error::operation_aborted)
            {
                acceptCallback_(tl::unexpected{"Server::onAccept(...) error: " + err.message()});
            }
        }
        else
        {
            // Notify the worker that new connection has been accepted
            acceptCallback_(tl::expected<std::size_t, std::string>{tl::in_place, workerIndex});
        }

        accept();
    }

private:
    boost::asio::io_context& ioContext_;
    boost::asio::ip::tcp::acceptor acceptor_;
    StartAcceptCallback startAcceptCallback_;
    AcceptCallback acceptCallback_;
    bool isPendingShutdown_;
};

Worker.h

class Worker
{
public:
    using Socket = boost::asio::ip::tcp::socket;
    using IoContext = boost::asio::io_context;

    Worker()
        : socket_(ioContext_)
    {
    }

    Socket* Worker::getSocket()
    {
        return &socket_;
    }

    void Worker::run()
    {
        worker_ = std::make_unique<std::thread>(std::bind(&Worker::runImpl, this));
        worker_->detach();
    }

    void Worker::handleNewConnection()
    {
        // From this point all the operation on this socket will be performed on worker's thread
        ioContext_.post([this, socket = std::move(this->socket_)] {
            // Create new session with newly accepted socket
            // and start asynchronous read/write operations
        });
    }

    void runImpl()
    {
        // Create payload_ to keep the io_service working
        // Run io_service
    }

private:
    Socket socket_;
    IoContext ioContext_;
    std::unique_ptr<IoContext::work> payload_;
    std::unique_ptr<std::thread> worker_;
};

Обработчик boost::signal_set, работающий в основном потоке, делает что-то вроде этого:

// Prevent access to the workers pool from main thread first
this->webSocketServer_->pendingOnShutdown();
// Destroys all workers
this->workersPool_->shutdown();
// Finally cancel accepting of new connections and stop the application
this->webSocketServer_->shutdown();

У меня boost::asio::ip::tcp::acceptor работает асинхронно в основном потоке, и у меня есть пул Worker объектов. Каждый Worker выполняет асинхронные операции ввода-вывода и другие операции, работает в своем собственном потоке и имеет собственный boost::asio::io_context. Сервер принимает новые соединения в сокет только от самого загруженного экземпляра Worker (и сокет работает на boost::asio::io_service работника), а затем делегирует выполнение операций ввода-вывода работнику, поэтому не требуется никаких синхронизаций, поскольку работники не любые общие данные. Предположим, я хочу закрыть приложение. Я должен прекратить прием новых подключений, прежде чем уничтожать пул Worker объектов. В моем коде я вызываю pendingOnShutdown() метод, чтобы предотвратить принятие, и затем я начинаю закрывать рабочих. После того, как все рабочие уничтожены, я вызываю метод acceptor::cancel(), чтобы отменить текущую операцию async_accept(...) и остановить boost::asio::io_context, работающий в главном потоке. Но что произойдет, если я немедленно вызову метод acceptor::cancel(), когда сеансы в экземплярах Worker все еще живы? Официальная документация описывает это следующим образом:

Эта функция немедленно приводит все незавершенные асинхронные операции соединения, отправки и получения к завершению sh, и обработчики отмененных операций будут переданы ошибка boost :: asio :: error :: operation_aborted.

Отменяет ли этот метод асинхронные операции с сокетами, работающими с внешним boost::asio::io_context? Могу ли я вызвать метод acceptor::cancel() немедленно, без вызова pendingOnShutdown() первым? У меня boost::signal_set работает в основном потоке, поэтому не беспокойтесь, что приложение закроется до того, как пул Worker объектов будет уничтожен. Я использую boost 1.71.

...