ZeroMQ заблокирован в context.close (). Как безопасно закрыть сокет и контекст в C ++? - PullRequest
3 голосов
/ 18 апреля 2019

Описание

Вещатель передает сообщения в разъеме PUB "tcp: // localhost: 5556" и сигнал управления STOP в другом разъеме PUB "tcp: // localhost: 5557". Слушатель получает сообщения. Он останавливается и выходит, как только слышит сигнал управления STOP.

Как документировано в 0MQ Termination white paper, чтобы остановить ожидание recv (), это стандартный метод завершения базового контекста, и recv () завершит работу, выдав исключение ETERM.

Хотя блок recv () освобожден, вместо него context.close () заблокирован. В результате программа по-прежнему не может быть безопасно завершена.

Кроме того, закрывает сокет перед контекстом и устанавливает значение задержки 0 абонентского сокета. Тем не менее, он заблокирован.

  • [система]: Ubuntu 18.04.1 (универсальная версия Linux 4.18.0-17)
  • [компилятор]: gcc-g ++ версия 7.3.0
  • [ZeroMQ]: libzmq 4.3.1 + cppzmq 4.3.0

Воспроизведение кода

Вот простой, но полный код для воспроизведения проблемы.

// Class Broadcast: A Broadcast object sends a message every 10ms,
//                  and finally sends a stop control signal
//     - start()  : start broadcasting.
class Broadcast {
public:
    Broadcast():
        context_(1),
        publisher_(context_, ZMQ_PUB),
        controller_(context_, ZMQ_PUB)
    {
        publisher_.bind("tcp://*:5556");
        controller_.bind("tcp://*:5557");
    }

    void start(){
        std::cout << "start Broadcast" << std::endl;
        // send data through publisher
        const int send_time = 5;
        const std::string foo_template("foo_");
        for(int i = 0; i < send_time; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            std::string foo = foo_template + std::to_string(i);
            zmq::message_t msg(foo.size());
            std::memcpy(msg.data(), foo.c_str(), foo.size());
            std::cout << "Broadcast: " << foo << std::endl;
            publisher_.send(msg);
        }
        // send stop control signal throgh controller
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        std::string stop("bar");
        zmq::message_t msg(stop.size());
        std::memcpy(msg.data(), stop.c_str(), stop.size());
        std::cout << "Broadcast Control Signal: " << stop << std::endl;
        controller_.send(msg);
        std::cout << "end Broadcast" << std::endl;
    }

private:
    zmq::context_t context_;
    zmq::socket_t  publisher_;
    zmq::socket_t  controller_;
}; // class Broadcast
// Class Listener : A Listener object receives messages from Broadcast
//                  until it receives a stop control signal.
//     - start()  : start receiving messages;
//     - control(): start receiving control signals;
//     - stop()   : set stop_flag and close zmq sockets and context

class Listener {
public:
    Listener():
        stop_(false),
        context_(1),
        subscriber_(context_, ZMQ_SUB),
        controller_(context_, ZMQ_SUB)
    {
        subscriber_.connect("tcp://localhost:5556");
        controller_.connect("tcp://localhost:5557");
        int linger = 0;
        subscriber_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
        subscriber_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
        controller_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
        controller_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
    }

    void start() {
        std::cout << "start Listener" << std::endl;
        stop_ = false;
        auto control_future = std::async([this]{ control(); });
        while(!stop_) {
            try {
                zmq::message_t msg;
                subscriber_.recv(&msg);
                std::string msg_str{static_cast<char*>(msg.data()), msg.size()};
                std::cout << "Received : " << msg_str << std::endl;
            } catch(const zmq::error_t& ex) {
                // recv() throws ETERM when the zmq context is destroyed,
                // as when AsyncZmqListener::Stop() is called
                if(ex.num() != ETERM)
                    throw;
                std::cerr << "subscriber stop with ETERM" << std::endl;
                break;
            }
        }
        std::cout << "wait control to join..." << std::endl;
        control_future.get();
        std::cout << "end Listener" << std::endl;

    }

    void control() {
        while(!stop_) {
            zmq::message_t ctrl;
            controller_.recv(&ctrl);
            std::string ctrl_str{static_cast<char*>(ctrl.data()), ctrl.size()};
            std::cout << "Received  Control Signal: " << ctrl_str << std::endl;
            if(ctrl_str == "bar") {
                stop();
            }
        }
    }

    void stop() {
        stop_ = true;
        std::cerr << "closing context..." << std::endl;
        subscriber_.close();
        controller_.close();
        context_.close();
        std::cerr << "context is closed." << std::endl;
    }

private:
    volatile bool  stop_;
    zmq::context_t context_;
    zmq::socket_t  subscriber_;
    zmq::socket_t  controller_;
}; // class Listener
// ## Problem
// Client cannot safely quit since context_.close() blocks the thread.

#include "zmq.hpp"

#include <iostream>
#include <chrono>
#include <future>

int main(int argc, char* argv[]) {

    Broadcast broadcast;
    Listener  listener;

    auto broadcast_future = std::async([&]{ broadcast.start(); });
    auto listener_future  = std::async([&]{ listener.start() ; });

    broadcast_future.get();
    listener_future.get();

    std::cout << "ALL COMPLETED" << std::endl;

    return 0;
}

Результат

Эта проблема появляется с возможностью , возможно, вам придется запускаться несколько раз, чтобы воспроизвести ее один раз. Возможны три результата: правильно завершенная версия, прерванная версия и заблокированная версия.

Правильная версия

, если программа корректно завершит работу, будет отображаться:

➜  zmq_safe_quit ./a.out
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received  Control Signal: bar
closing context...
subscriber stop with ETERM
wait control to join...
context is closed.
end Listener
ALL COMPLETED

Заблокированная версия

➜  zmq_safe_quit ./a.out
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received  Control Signal: bar
closing context...
context is closed.

«Конечный слушатель» отсутствует, а оболочка заблокирована.

Прервать версию

start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received  Control Signal: bar
closing context...
Assertion failed: pfd.revents & POLLIN (/home/davidwu/src/libzmq/src/signaler.cpp:264)
[1]    16079 abort (core dumped)  ./a.out

и обратный след указан как:

#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1  0x00007f90dd99a801 in __GI_abort () at abort.c:79
#2  0x00007f90de57a52e in zmq::zmq_abort(char const*) () from /usr/local/lib/libzmq.so.5
#3  0x00007f90de59ca67 in zmq::signaler_t::wait(int) () from /usr/local/lib/libzmq.so.5
#4  0x00007f90de57ea5c in zmq::mailbox_t::recv(zmq::command_t*, int) () from /usr/local/lib/libzmq.so.5
#5  0x00007f90de59e9c7 in zmq::socket_base_t::process_commands(int, bool) () from /usr/local/lib/libzmq.so.5
#6  0x00007f90de59f726 in zmq::socket_base_t::recv(zmq::msg_t*, int) () from /usr/local/lib/libzmq.so.5
#7  0x00007f90de5c4e8c in zmq_msg_recv () from /usr/local/lib/libzmq.so.5
#8  0x0000561da8eb18f3 in zmq::socket_t::recv(zmq::message_t*, int) ()
#9  0x0000561da8eb2b47 in Listener::start() ()

Вопрос

Как безопасно выйти из абонента по внешнему сигналу? Что не так с приведенным выше кодом? Или есть лучшая структура и дизайн, чтобы организовать и справиться с такой ситуацией?

Ответы [ 2 ]

1 голос
/ 23 апреля 2019

Я всегда держу контекст zmq под контролем основного потока.В этом случае я бы сделал что-то вроде

псевдокода:

main()
{
context(1) // only one context
job1(context) // pass ref to the one context
job2(context) // pass ref to the one context
job1.join()
job2.join()
context.close()
}

Если ваша структура означает, что вы не можете этого сделать, вам нужно больше думать о том, как вы справляетесь с выключением.

Ваш код вызывает управляющий код (на сокете) в одном потоке

subscriber_.close();
controller_.close();
context_.close();

И код обработки (на сокете) в другом

controller_.recv(&ctrl);

Две причины не делать этого

  • Стандартные сокеты zmq не являются поточно-ориентированными
  • у вас есть условие гонки, когда сокет и контекст могут быть уничтожены, пока поток recvблокируетКогда он разблокируется, он просто потерпит неудачу неопределенным образом, так как zmq фактически мертв (контекст закрыт) на этом этапе.

Вы должны открывать, использовать и закрывать сокет в том же потоке.В этом случае (объект subscriber_) вызовите close() для сокета в потоке start(), когда вы получите ETERM или stop_, равное

0 голосов
/ 23 апреля 2019

Спасибо за ответ @James, я решил эту проблему сам и обновил код, внеся следующие изменения:

  1. управляйте контекстом в main() и передавайте его с помощью std::shared_ptr;
  2. закрывать сокеты в потоке, создающем его;
  3. передавать дополнительное сообщение для очистки очереди zmq;
  4. использовать std::atomic<bool> вместо volatile для ввода флага stop_.

В результате нам не нужно жестоко уничтожать context и ловить ETERM, что является неестественным способом.И все сокеты во всех потоках могут безопасно завершиться.

В конце я выложу здесь исходный код.Надеюсь, что это может помочь другим, столкнувшимся с той же проблемой.

class Broadcast {
public:
    Broadcast(std::shared_ptr<zmq::context_t> context):
        context_(context),
        publisher_(*context_, ZMQ_PUB),
        controller_(*context_, ZMQ_PUB)
    {
        publisher_.bind("tcp://*:5556");
        controller_.bind("tcp://*:5557");
    }

    ~Broadcast() {
        publisher_.close();
        controller_.close();
    }

    void start(){
        std::cout << "start Broadcast" << std::endl;
        // send data through publisher
        const int send_time = 5;
        const std::string foo_template("foo_");
        for(int i = 0; i < send_time; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            std::string foo = foo_template + std::to_string(i);
            zmq::message_t msg(foo.size());
            std::memcpy(msg.data(), foo.c_str(), foo.size());
            std::cout << "Broadcast: " << foo << std::endl;
            publisher_.send(msg);
        }
        // send stop control signal through controller
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        std::string stop("bar");
        zmq::message_t msg(stop.size());
        std::memcpy(msg.data(), stop.c_str(), stop.size());
        std::cout << "Broadcast Control Signal: " << stop << std::endl;
        controller_.send(msg);
        std::cout << "end Broadcast" << std::endl;

        // FIX: post extra message to flush zmq queue
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::string foo = foo_template + "end";
        zmq::message_t msg_end(foo.size());
        std::memcpy(msg_end.data(), foo.c_str(), foo.size());
        std::cout << "Broadcast: " << foo << std::endl;
        publisher_.send(msg_end);
    }
private:
    std::shared_ptr<zmq::context_t> context_;
    zmq::socket_t  publisher_;
    zmq::socket_t  controller_;
}; // class Broadcast
class Client {
public:
    Client(std::shared_ptr<zmq::context_t> context):
        stop_(false),
        context_(context),
        subscriber_(*context_, ZMQ_SUB),
        controller_(*context_, ZMQ_SUB)
    {
        int linger = 0;
        subscriber_.connect("tcp://localhost:5556");
        controller_.connect("tcp://localhost:5557");
        subscriber_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
        controller_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
        subscriber_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
        controller_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
    }

    ~Client() {
        subscriber_.close();
        controller_.close();
    }

    void start() {
        stop_ = false;

        std::cout << "start Client" << std::endl;
        auto control_future = std::async(std::launch::async, [this]{ control(); });
        while(!stop_) {
            try {
                zmq::message_t msg;
                subscriber_.recv(&msg);
                std::string msg_str{static_cast<char*>(msg.data()), msg.size()};
                std::cout << "Received : " << msg_str << std::endl;
            } catch(const zmq::error_t& ex) {
                if(ex.num() != ETERM)
                    throw;
                break; // exit while loop
            }
        }
        std::cout << "wait control to join..." << std::endl;
        control_future.get();
        std::cout << "end Client" << std::endl;
    }

    void control() {
        while(!stop_) {
            zmq::message_t ctrl;
            controller_.recv(&ctrl);
            std::string ctrl_str{static_cast<char*>(ctrl.data()), ctrl.size()};
            std::cout << "Received  Control Signal: " << ctrl_str << std::endl;
            if(ctrl_str == "bar") {
                stop_ = true;
            }
        }
    }

private:
    std::atomic<bool> stop_;
    std::shared_ptr<zmq::context_t> context_;
    zmq::socket_t  subscriber_;
    zmq::socket_t  controller_;
}; // class Client
int main(int argc, char* argv[]) {

    auto gContext = std::make_shared<zmq::context_t>(1);

    Broadcast broadcast(gContext);
    Client    client(gContext);

    auto broadcast_future = std::async([&]{ broadcast.start(); });
    auto client_future    = std::async([&]{ client.start()   ; });

    broadcast_future.get();
    client_future.get();

    std::cout << "ALL COMPLETED" << std::endl;

    return 0;
}

Скомпилируйте и запустите, можно получить правильный результат:

➜  zmq_safe_quit ./a.out
start Client
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received  Control Signal: bar
Broadcast: foo_end
Received : foo_end
wait control to join...
end Client
ALL COMPLETED
...