У меня есть сервер веб-сокетов на boost :: beast, упрощенный код приведен ниже:
Server.h
class Server
{
public:
using WorkerData = std::pair<std::size_t, boost::asio::ip::tcp::socket*>;
using StartAcceptCallback = std::function<WorkerData()>;
using AcceptCallback = std::function<void(tl::expected<std::size_t, std::string>)>;
Server(boost::asio::io_context& ioContext, const std::string& host, unsigned short port)
: ioContext_(ioContext), acceptor_(ioContext), isPendingShutdown_(false)
{
// Configure and start listening
}
void run()
{
accept();
}
void pendingOnShutdown()
{
isPendingShutdown_ = true;
}
void shutdown()
{
acceptor_.cancel();
}
void accept()
{
// This callback finds lowest loaded worker and returns an pointer to socket and worker index
const WorkerData workerData = startAcceptCallback_();
acceptor_.async_accept(*workerData.second,
boost::beast::bind_front_handler(&Server::onAccept, this, workerData.first));
}
void onAccept(std::size_t workerIndex, boost::beast::error_code err)
{
if(isPendingShutdown_)
{
return;
}
if(err)
{
if(err != boost::asio::error::operation_aborted)
{
acceptCallback_(tl::unexpected{"Server::onAccept(...) error: " + err.message()});
}
}
else
{
// Notify the worker that new connection has been accepted
acceptCallback_(tl::expected<std::size_t, std::string>{tl::in_place, workerIndex});
}
accept();
}
private:
boost::asio::io_context& ioContext_;
boost::asio::ip::tcp::acceptor acceptor_;
StartAcceptCallback startAcceptCallback_;
AcceptCallback acceptCallback_;
bool isPendingShutdown_;
};
Worker.h
class Worker
{
public:
using Socket = boost::asio::ip::tcp::socket;
using IoContext = boost::asio::io_context;
Worker()
: socket_(ioContext_)
{
}
Socket* Worker::getSocket()
{
return &socket_;
}
void Worker::run()
{
worker_ = std::make_unique<std::thread>(std::bind(&Worker::runImpl, this));
worker_->detach();
}
void Worker::handleNewConnection()
{
// From this point all the operation on this socket will be performed on worker's thread
ioContext_.post([this, socket = std::move(this->socket_)] {
// Create new session with newly accepted socket
// and start asynchronous read/write operations
});
}
void runImpl()
{
// Create payload_ to keep the io_service working
// Run io_service
}
private:
Socket socket_;
IoContext ioContext_;
std::unique_ptr<IoContext::work> payload_;
std::unique_ptr<std::thread> worker_;
};
Обработчик boost::signal_set
, работающий в основном потоке, делает что-то вроде этого:
// Prevent access to the workers pool from main thread first
this->webSocketServer_->pendingOnShutdown();
// Destroys all workers
this->workersPool_->shutdown();
// Finally cancel accepting of new connections and stop the application
this->webSocketServer_->shutdown();
У меня boost::asio::ip::tcp::acceptor
работает асинхронно в основном потоке, и у меня есть пул Worker
объектов. Каждый Worker
выполняет асинхронные операции ввода-вывода и другие операции, работает в своем собственном потоке и имеет собственный boost::asio::io_context
. Сервер принимает новые соединения в сокет только от самого загруженного экземпляра Worker
(и сокет работает на boost::asio::io_service
работника), а затем делегирует выполнение операций ввода-вывода работнику, поэтому не требуется никаких синхронизаций, поскольку работники не любые общие данные. Предположим, я хочу закрыть приложение. Я должен прекратить прием новых подключений, прежде чем уничтожать пул Worker
объектов. В моем коде я вызываю pendingOnShutdown()
метод, чтобы предотвратить принятие, и затем я начинаю закрывать рабочих. После того, как все рабочие уничтожены, я вызываю метод acceptor::cancel()
, чтобы отменить текущую операцию async_accept(...)
и остановить boost::asio::io_context
, работающий в главном потоке. Но что произойдет, если я немедленно вызову метод acceptor::cancel()
, когда сеансы в экземплярах Worker
все еще живы? Официальная документация описывает это следующим образом:
Эта функция немедленно приводит все незавершенные асинхронные операции соединения, отправки и получения к завершению sh, и обработчики отмененных операций будут переданы ошибка boost :: asio :: error :: operation_aborted.
Отменяет ли этот метод асинхронные операции с сокетами, работающими с внешним boost::asio::io_context
? Могу ли я вызвать метод acceptor::cancel()
немедленно, без вызова pendingOnShutdown()
первым? У меня boost::signal_set
работает в основном потоке, поэтому не беспокойтесь, что приложение закроется до того, как пул Worker
объектов будет уничтожен. Я использую boost 1.71
.