Более чем немного странно использовать сердцебиение ... поток «отправитель» с asyn c IO.
Более того, нет синхронизации на объекте сокета, так что это гонка данных , что составляет Неопределенное поведение .
Наконец, это небезопасно:
std::string data = boost::asio::buffer_cast<const char*>(buf.data());
Предполагается, что data () будет завершаться NUL (что неверно).
Типичный однопоточный ASIO
Вы не будете порождать потоки для таймеров, а используйте, например, boost::asio::deadline_timer
или boost::asio::highresolution_timer
. Он может ждать асинхронно, поэтому вы можете выполнять другие задачи в службе ввода-вывода, пока она не истечет.
Точно так же вы можете выполнять чтение / запись запроса / ответа асинхронно. Единственным «усложняющим» фактором является то, что асинхронные вызовы не завершаются перед возвратом, поэтому вы должны убедиться, что буферы живут достаточно долго (они не должны быть локальной переменной).
Теперь у вас уже есть логическая «единица» времени жизни, которая практически ВЫрывается на вас из кода:
![enter image description here](https://i.stack.imgur.com/r0Ctb.png)
Это просто кричит, чтобы его переписали как
struct LifeTimeUnit {
boost::asio::ip::tcp::socket sock;
void process();
std::string read_data();
void write_data(std::string);
void hearbeatSender(sock);
};
Конечно, LifeTimeUnit
- забавное имя, поэтому давайте подумаем о лучшем: Session
кажется значимым!
Теперь, когда у нас есть единица времени жизни, она может содержать другие такие вещи, как буферы и таймер:
struct Session {
Session(tcp::socket&& s) : sock(std::move(s)) {}
void start() {
hb_wait();
req_loop();
}
void cancel() {
hbtimer.cancel();
sock.cancel(); // or shutdown() e.g.
}
private:
bool checked(error_code ec, std::string const& msg = "error") {
if (ec) {
std::clog << msg << ": " << ec.message() << "\n";
cancel();
}
return !ec.failed();;
}
void req_loop(error_code ec = {}) {
if (!checked(ec, "req_loop")) {
async_read_until(sock, buf, "\n",
[this](error_code ec, size_t xfr) { on_request(ec, xfr); });
}
}
void on_request(error_code ec, size_t n) {
if (checked(ec, "on_request")) {
request.resize(n);
buf.sgetn(request.data(), n);
response = "Case " + std::to_string(request.at(0) - '0') + "\n";
async_write(sock, buffer(response),
[this](error_code ec, size_t) { req_loop(ec); });
}
}
void hb_wait(error_code ec = {}) {
if (checked(ec, "hb_wait")) {
hbtimer.expires_from_now(2s);
hbtimer.async_wait([this](error_code ec) { hb_send(ec); });
}
}
void hb_send(error_code ec) {
if (checked(ec, "hb_send")) {
async_write(sock, buffer(hbmsg), [this](error_code ec, size_t) { hb_wait(ec); });
}
}
tcp::socket sock;
boost::asio::high_resolution_timer hbtimer { sock.get_executor() };
const std::string hbmsg = "HEARTBEAT\n";
boost::asio::streambuf buf;
std::string request, response;
};
Единственные publi c вещи - start()
(на самом деле, cancel()
нам пока не нужен, но вы знаете).
Основная программа может оставаться неизменной:
tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();
tcp::socket sock(io);
acceptor.accept(sock);
Session sess(std::move(sock));
sess.start(); // does both request loop and the heartbeat
io.run();
Больше никаких потоков, идеальная асинхронность! Использование bash
и netcat
для тестирования:
while sleep 4; do printf "%d request\n" {1..10}; done | netcat localhost 3333
Выводит:
host 3333
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
HEARTBEAT
HEARTBEAT
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
^C
После остановки клиента сервер выходит с
on_request: End of file
hb_send: Operation canceled
Single -Thread / Multi-Session
Большим преимуществом является то, что теперь вы можете принимать несколько клиентов в одном потоке сервера. Фактически, тысячи из них одновременно без проблем.
int main() {
boost::asio::thread_pool io(1);
try {
tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();
std::list<Session> sessions;
while (true) {
tcp::socket sock(io);
acceptor.accept(sock);
auto& sess = sessions.emplace_back(std::move(sock));
sess.start(); // does both request loop and the heartbeat
sessions.remove_if([](Session& s) { return !s.is_active(); });
}
io.join();
} catch (boost::system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.code().message() << "\n";
return e.code().value();
}
}
Обратите внимание, как мы тонко изменили наш контекст выполнения на пул одноэлементных потоков. Это означает, что мы по-прежнему запускаем все сеансы в одном потоке, но это другой поток, чем запуск main()
, что означает, что мы можем продолжать принимать соединения.
Чтобы избежать постоянного увеличения списка sessions
, мы отсеяли неактивные, используя тривиально реализованное свойство is_active()
.
Обратите внимание, что мы можем ПОЧТИ принудительно завершить работу, выполнив
for (auto& sess: sessions)
sess.cancel();
Это ПОЧТИ, потому что это требует публикации операций отмены в потоке пула:
for (auto& sess: sessions)
post(io, [&sess] { sess.cancel(); });
Это сделано для того, чтобы избежать гонок с любыми задачами в пуле ввода-вывода
Поскольку sessions
касается только основного потока, блокировка не требуется.
Live On Coliru
Тестирование с
for a in 3 2 1; do (sleep $a; echo "$a request" | nc 127.0.0.1 3333)& done; time wait
Печать:
Case 1
Case 2
Case 3
HEARTBEAT
HEARTBEAT
...
Многопоточность для Win?
Теперь мы можем добавить многопоточность. Изменения незначительны:
- мы хотим связать сокет с цепью (см. Зачем мне нужна цепочка на соединение при использовании boost :: asio? )
- обратите внимание, что мы уже используем исполнителя
sock
для запуска таймера Мы должны принять дополнительные меры, чтобы сделать весь интерфейс publi c в Session
потокобезопасным:
- разместить действия из
start()
и cancel()
на цепочке - сделать флаг
active
atomic_bool
далее мы просто увеличиваем количество потоков в пуле с 1
до, скажем, 10
Обратите внимание, на практике редко имеет смысл использовать больше потоков, чем логических ядер. Кроме того, в этом простом примере все связано с вводом-выводом, поэтому, вероятно, уже обслуживается один поток. Это просто для демонстрации
Live On Coliru
boost::asio::thread_pool io(10);
try {
tcp::acceptor acceptor(io, tcp::v4());
acceptor.set_option(tcp::acceptor::reuse_address(true));
acceptor.bind({{}, 3333});
acceptor.listen();
std::list<Session> sessions;
while (true) {
tcp::socket sock(make_strand(io)); // NOTE STRAND!
// ...
// ...
io.join();
И изменения в Session
:
void start() {
active = true;
post(sock.get_executor(), [this]{
hb_wait();
req_loop();
});
}
void cancel() {
post(sock.get_executor(), [this]{
hbtimer.cancel();
sock.cancel(); // or shutdown() e.g.
active = false;
});
}
// ....
std::atomic_bool active {false};
}