Как передать сокет boost asio tcp в поток для отправки пульса клиенту или серверу - PullRequest
1 голос
/ 29 мая 2020

Я пишу программу клиент / сервер в ускоренном TCP, в которой я хочу отправлять сообщение HEARTBEAT клиенту каждые 2 секунды, для чего я пытаюсь создать новый поток, с помощью которого я могу легко отправить его, но не могу решить Это. Я создаю поток, используя boost::thread t(hearbeatSender,sock); this. но дает много ошибок. Я также использую bind, чтобы связать имя функции с сокетом, но не разрешил ошибку.

void process(boost::asio::ip::tcp::socket & sock);
std::string read_data(boost::asio::ip::tcp::socket & sock);
void write_data(boost::asio::ip::tcp::socket & sock,std::string);
void hearbeatSender(boost::asio::ip::tcp::socket & sock);
int main()
{

    unsigned short port_num = 3333;
    boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4::any(), port_num);
    boost::asio::io_service io;
    try
    {
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        boost::asio::ip::tcp::socket sock(io);
        acceptor.accept(sock);
        boost::thread t(hearbeatSender,sock); 
        process(sock);
        t.join();

    }
    catch (boost::system::system_error &e)
    {
        std::cout << "Error occured! Error code = " << e.code()
        << ". Message: " << e.what();

        return e.code().value();
    }
  return 0;

}
void process(boost::asio::ip::tcp::socket & sock)
{
    while(1){
    std::string data = read_data(sock);
    std::cout<<"Client's request is: "<<data<<std::endl;
    write_data(sock,data);
    }
}
std::string read_data(boost::asio::ip::tcp::socket & sock)
{
    boost::asio::streambuf buf;
    boost::asio::read_until(sock, buf, "\n");
    std::string data = boost::asio::buffer_cast<const char*>(buf.data());
    return data;
}
void write_data(boost::asio::ip::tcp::socket & sock,std::string data)
{
    boost::system::error_code error;
    std::string msg;
    int ch = data[0]-'0';
    switch(ch)
    {
        case 1: msg = "Case 1\n"; break;
        case 2: msg = "Case 2\n"; break;
        case 3: msg = "Case 3\n"; break;
        case 4: msg = "Case 4\n"; break;
        default: msg  = "Case default\n"; break;
    }
    boost::asio::write( sock, boost::asio::buffer(msg+ "\n"), error );
     if( !error ) {
        std::cout << "Server sent hello message!" << std::endl;
     }
     else {
        std::cout << "send failed: " << error.message() << std::endl;
     }
}
void hearbeatSender(boost::asio::ip::tcp::socket & sock)
{
    boost::system::error_code error;
    std::string msg = "HEARTBEAT";
    while(1)
    {
        sleep(2);
        std::cout<<msg<<std::endl;
        boost::asio::write( sock, boost::asio::buffer(msg+ "\n"), error );
        if( !error ) {
        std::cout << "Server sent HEARTBEAT message!" << std::endl;
        }
        else {
            std::cout << "send failed: " << error.message() << std::endl;
        }
    }
}

Это код на стороне сервера для ответа на сообщение клиента и отправки пульса клиенту. Это синхронный TCP-сервер.

Ответы [ 3 ]

1 голос
/ 29 мая 2020

Вместо этого:

    boost::asio::ip::tcp::socket sock(io);
    acceptor.accept(sock);
    boost::thread t(hearbeatSender,sock); 

this:

    auto sock = acceptor.accept();
    std::thread t([&sock]() {
        hearbeatSender(sock);
    });

И вместо sleep просто использовал std :: this_thread :: sleep для универсальной компиляции.

Вот полная программа, которая компилируется и запускает

#include <boost/asio.hpp>
#include <iostream>


void process(boost::asio::ip::tcp::socket& sock);
std::string read_data(boost::asio::ip::tcp::socket& sock);
void write_data(boost::asio::ip::tcp::socket& sock, std::string);
void hearbeatSender(boost::asio::ip::tcp::socket& sock);
int main()
{

    unsigned short port_num = 3333;
    boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4::any(), port_num);
    boost::asio::io_service io;
    try
    {
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        auto sock = acceptor.accept();
        std::thread t([&sock]() {
            hearbeatSender(sock);
        });
        process(sock);
        t.join();

    }
    catch (boost::system::system_error& e)
    {
        std::cout << "Error occured! Error code = " << e.code()
            << ". Message: " << e.what();

        return e.code().value();
    }
    return 0;

}
void process(boost::asio::ip::tcp::socket& sock)
{
    while (1) {
        std::string data = read_data(sock);
        std::cout << "Client's request is: " << data << std::endl;
        write_data(sock, data);
    }
}
std::string read_data(boost::asio::ip::tcp::socket& sock)
{
    boost::asio::streambuf buf;
    boost::asio::read_until(sock, buf, "\n");
    std::string data = boost::asio::buffer_cast<const char*>(buf.data());
    return data;
}
void write_data(boost::asio::ip::tcp::socket& sock, std::string data)
{
    boost::system::error_code error;
    std::string msg;
    int ch = data[0] - '0';
    switch (ch)
    {
    case 1: msg = "Case 1\n"; break;
    case 2: msg = "Case 2\n"; break;
    case 3: msg = "Case 3\n"; break;
    case 4: msg = "Case 4\n"; break;
    default: msg = "Case default\n"; break;
    }
    boost::asio::write(sock, boost::asio::buffer(msg + "\n"), error);
    if (!error) {
        std::cout << "Server sent hello message!" << std::endl;
    }
    else {
        std::cout << "send failed: " << error.message() << std::endl;
    }
}
void hearbeatSender(boost::asio::ip::tcp::socket& sock)
{
    boost::system::error_code error;
    std::string msg = "HEARTBEAT";
    while (1)
    {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << msg << std::endl;
        boost::asio::write(sock, boost::asio::buffer(msg + "\n"), error);
        if (!error) {
            std::cout << "Server sent HEARTBEAT message!" << std::endl;
        }
        else {
            std::cout << "send failed: " << error.message() << std::endl;
        }
    }
}
0 голосов
/ 02 июня 2020

Вместо этого:

try
    {
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        auto sock = acceptor.accept();
        std::thread t([&sock]() {
            hearbeatSender(sock);
        });
        process(sock);
        t.join();

    }

Используйте его:

try{
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        boost::asio::ip::tcp::socket sock(io);
        acceptor.accept(sock);

        std::thread t([&sock]() {
            hearbeatSender(sock);
        });
        process(sock);
        t.join();
}

, а также включите файлы заголовков:

#include <thread>
#include <chrono>

(Необязательно) вы также можете использовать this_thread::sleep_for вместо sleep() std::this_thread::sleep_for(std::chrono::seconds(10));

Проблема передачи сокета в поток решена.

Теперь, для разговора СЕРДЦЕ между клиентом и сервером. Полный код можно проверить отсюда:

Код клиента Передача HEARTBEAT каждые 5 секунд

Код сервера для отправки ответа клиенту

0 голосов
/ 29 мая 2020

Более чем немного странно использовать сердцебиение ... поток «отправитель» с asyn c IO.

Более того, нет синхронизации на объекте сокета, так что это гонка данных , что составляет Неопределенное поведение .

Наконец, это небезопасно:

    std::string data = boost::asio::buffer_cast<const char*>(buf.data());

Предполагается, что data () будет завершаться NUL (что неверно).

Типичный однопоточный ASIO

Вы не будете порождать потоки для таймеров, а используйте, например, boost::asio::deadline_timer или boost::asio::highresolution_timer. Он может ждать асинхронно, поэтому вы можете выполнять другие задачи в службе ввода-вывода, пока она не истечет.

Точно так же вы можете выполнять чтение / запись запроса / ответа асинхронно. Единственным «усложняющим» фактором является то, что асинхронные вызовы не завершаются перед возвратом, поэтому вы должны убедиться, что буферы живут достаточно долго (они не должны быть локальной переменной).

Теперь у вас уже есть логическая «единица» времени жизни, которая практически ВЫрывается на вас из кода:

enter image description here

Это просто кричит, чтобы его переписали как

struct LifeTimeUnit {
    boost::asio::ip::tcp::socket sock;

    void process();
    std::string read_data();
    void write_data(std::string);
    void hearbeatSender(sock);
};

Конечно, LifeTimeUnit - забавное имя, поэтому давайте подумаем о лучшем: Session кажется значимым!


Теперь, когда у нас есть единица времени жизни, она может содержать другие такие вещи, как буферы и таймер:

struct Session {
    Session(tcp::socket&& s) : sock(std::move(s)) {}

    void start() {
        hb_wait();
        req_loop();
    }

    void cancel() {
        hbtimer.cancel();
        sock.cancel(); // or shutdown() e.g.
    }

  private:
    bool checked(error_code ec, std::string const& msg = "error") {
        if (ec) {
            std::clog << msg << ": " << ec.message() << "\n";
            cancel();
        }
        return !ec.failed();;
    }

    void req_loop(error_code ec = {}) {
        if (!checked(ec, "req_loop")) {
            async_read_until(sock, buf, "\n",
                    [this](error_code ec, size_t xfr) { on_request(ec, xfr); });
        }
    }

    void on_request(error_code ec, size_t n) {
        if (checked(ec, "on_request")) {
            request.resize(n);
            buf.sgetn(request.data(), n);

            response = "Case " + std::to_string(request.at(0) - '0') + "\n";
            async_write(sock, buffer(response), 
                    [this](error_code ec, size_t) { req_loop(ec); });
        }
    }

    void hb_wait(error_code ec = {}) {
        if (checked(ec, "hb_wait")) {
            hbtimer.expires_from_now(2s);
            hbtimer.async_wait([this](error_code ec) { hb_send(ec); });
        }
    }

    void hb_send(error_code ec) {
        if (checked(ec, "hb_send")) {
            async_write(sock, buffer(hbmsg), [this](error_code ec, size_t) { hb_wait(ec); });
        }
    }

    tcp::socket sock;
    boost::asio::high_resolution_timer hbtimer { sock.get_executor() };
    const std::string hbmsg = "HEARTBEAT\n";
    boost::asio::streambuf buf;
    std::string request, response;
};

Единственные publi c вещи - start() (на самом деле, cancel() нам пока не нужен, но вы знаете).

Основная программа может оставаться неизменной:

tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();

tcp::socket sock(io);
acceptor.accept(sock);

Session sess(std::move(sock));
sess.start(); // does both request loop and the heartbeat

io.run();

Больше никаких потоков, идеальная асинхронность! Использование bash и netcat для тестирования:

while sleep 4; do printf "%d request\n" {1..10}; done | netcat localhost 3333

Выводит:

host 3333
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
HEARTBEAT
HEARTBEAT
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
^C

После остановки клиента сервер выходит с

on_request: End of file
hb_send: Operation canceled

Single -Thread / Multi-Session

Большим преимуществом является то, что теперь вы можете принимать несколько клиентов в одном потоке сервера. Фактически, тысячи из них одновременно без проблем.

int main() {
    boost::asio::thread_pool io(1);
    try {
        tcp::acceptor acceptor(io, tcp::v4());
        acceptor.bind({{}, 3333});
        acceptor.listen();

        std::list<Session> sessions;

        while (true) {
            tcp::socket sock(io);
            acceptor.accept(sock);

            auto& sess = sessions.emplace_back(std::move(sock));
            sess.start(); // does both request loop and the heartbeat

            sessions.remove_if([](Session& s) { return !s.is_active(); });
        }

        io.join();
    } catch (boost::system::system_error& e) {
        std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.code().message() << "\n";
        return e.code().value();
    }
}

Обратите внимание, как мы тонко изменили наш контекст выполнения на пул одноэлементных потоков. Это означает, что мы по-прежнему запускаем все сеансы в одном потоке, но это другой поток, чем запуск main(), что означает, что мы можем продолжать принимать соединения.

Чтобы избежать постоянного увеличения списка sessions, мы отсеяли неактивные, используя тривиально реализованное свойство is_active().

Обратите внимание, что мы можем ПОЧТИ принудительно завершить работу, выполнив

for (auto& sess: sessions)
    sess.cancel();

Это ПОЧТИ, потому что это требует публикации операций отмены в потоке пула:

for (auto& sess: sessions)
    post(io, [&sess] { sess.cancel(); });

Это сделано для того, чтобы избежать гонок с любыми задачами в пуле ввода-вывода

Поскольку sessions касается только основного потока, блокировка не требуется.

Live On Coliru

Тестирование с

for a in 3 2 1; do (sleep $a; echo "$a request" | nc 127.0.0.1 3333)& done; time wait

Печать:

Case 1
Case 2
Case 3
HEARTBEAT
HEARTBEAT
...

Многопоточность для Win?

Теперь мы можем добавить многопоточность. Изменения незначительны:

  • мы хотим связать сокет с цепью (см. Зачем мне нужна цепочка на соединение при использовании boost :: asio? )
  • обратите внимание, что мы уже используем исполнителя sock для запуска таймера
  • Мы должны принять дополнительные меры, чтобы сделать весь интерфейс publi c в Session потокобезопасным:

    • разместить действия из start() и cancel() на цепочке
    • сделать флаг active atomic_bool
  • далее мы просто увеличиваем количество потоков в пуле с 1 до, скажем, 10

Обратите внимание, на практике редко имеет смысл использовать больше потоков, чем логических ядер. Кроме того, в этом простом примере все связано с вводом-выводом, поэтому, вероятно, уже обслуживается один поток. Это просто для демонстрации

Live On Coliru

boost::asio::thread_pool io(10);
try {
    tcp::acceptor acceptor(io, tcp::v4());
    acceptor.set_option(tcp::acceptor::reuse_address(true));
    acceptor.bind({{}, 3333});
    acceptor.listen();

    std::list<Session> sessions;

    while (true) {

        tcp::socket sock(make_strand(io)); // NOTE STRAND!
// ...
// ...

    io.join();

И изменения в Session:

   void start() {
        active = true;
        post(sock.get_executor(), [this]{
            hb_wait();
            req_loop();
        });
    }

    void cancel() {
        post(sock.get_executor(), [this]{
            hbtimer.cancel();
            sock.cancel(); // or shutdown() e.g.
            active = false;
        });
    }

// ....

    std::atomic_bool active {false};
}

...