В настоящее время я использую библиотеку Asio C ++ и написал оболочку для клиента.Мой оригинальный подход был очень базовым и требовал только потока в одном направлении.Требования изменились, и я переключился на использование всех асинхронных вызовов.Большая часть миграции была легкой, за исключением asio::async_write(...)
.Я использовал несколько разных подходов и неизбежно зашел в тупик с каждым из них.
Приложение непрерывно передает данные в большом объеме.Я держался подальше от нитей, потому что они не блокируют и могут привести к проблемам с памятью, особенно когда сервер находится под большой нагрузкой.Задания будут резервироваться, а куча приложений бесконечно возрастает.
Поэтому я создал блокирующую очередь только для того, чтобы выяснить, каким сложным образом использование блокировок для обратных вызовов и / или блокирование событий приводит к неизвестному поведению.
Оболочка - это очень большой класс, поэтому я постараюсь объяснить свой ландшафт в его текущем состоянии и, надеюсь, получить несколько хороших советов:
- У меня есть
asio::steady_timer
, который работает по фиксированному расписанию до отправка сообщения пульса непосредственно в очередь блокировки. - поток, предназначенный для чтения событий, и отправка их в очередь блокировки
- поток, выделенный для потребление очереди блокировки
Например, в моей очереди у меня есть queue::block()
и queue::unblock()
, которые являются просто оболочками для условной переменной / мьютекса.
std::thread consumer([this]() {
std::string message_buffer;
while (queue.pop(message_buffer)) {
queue.stage_block();
asio::async_write(*socket, asio::buffer(message_buffer), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
queue.block();
}
});
void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
queue.unblock();
}
Когда сокет выполняет резервное копирование и сервер больше не может принимать данные из-за текущей загрузки, очередь заполняется и приводит к тупику, при котором handle_write(...)
никогда не вызываетсяed.
Другой подход полностью исключает потребительский поток и опирается на handle_write(...)
для выталкивания очереди.Вот так:
void networking::write(const std::string& data) {
if (!queue.closed()) {
std::stringstream stream_buffer;
stream_buffer << data << std::endl;
spdlog::get("console")->debug("pushing to queue {}", queue.size());
queue.push(stream_buffer.str());
if (queue.size() == 1) {
spdlog::get("console")->debug("handle_write: {}", stream_buffer.str());
asio::async_write(*socket, asio::buffer(stream_buffer.str()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
}
}
}
void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
std::string message;
queue.pop(message);
if (!queue.closed() && !queue.empty()) {
std::string front = queue.front();
asio::async_write(*socket, asio::buffer(queue.front()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
}
}
Это также привело к тупику и, очевидно, к другим гоночным проблемам.Когда я отключил свой обратный вызов сердцебиения, у меня не было абсолютно никаких проблем.Тем не менее, сердцебиение является обязательным требованием.
Что я делаю не так?Какой подход лучше?