ZeroMQ IP C для нескольких экземпляров программы - PullRequest
5 голосов
/ 07 февраля 2020

У меня проблемы с межпроцессным взаимодействием в ZMQ между несколькими экземплярами программы.

  • Я использую Linux OS
  • Я использую заголовок zeromq / cppzmq, заголовок только привязка C ++ для libzmq

Если я запускаю два экземпляра этого приложения (скажем, на терминале), я предоставляю один с аргументом, чтобы быть слушателем, затем предоставляя другому аргумент, чтобы быть отправитель Слушатель никогда не получает сообщение. Я пробовал TCP и IP C безрезультатно.

#include <zmq.hpp>
#include <string>
#include <iostream>

int ListenMessage();
int SendMessage(std::string str);

zmq::context_t global_zmq_context(1);

int main(int argc, char* argv[] ) {
    std::string str = "Hello World";
    if (atoi(argv[1]) == 0) ListenMessage();
    else SendMessage(str);

    zmq_ctx_destroy(& global_zmq_context);
    return 0;
}


int SendMessage(std::string str) {
    assert(global_zmq_context);
    std::cout << "Sending \n";
    zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
    assert(publisher);

    int linger = 0;
    int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: connect failed: %s\n", strerror (errno));
        return -1;
    }

    zmq::message_t message(static_cast<const void*> (str.data()), str.size());
    rc = publisher.send(message);
    if (rc == -1) {
        printf ("E: send failed: %s\n", strerror (errno));
        return -1;
    }
    return 0;
}

int ListenMessage() {
    assert(global_zmq_context);
    std::cout << "Listening \n";
    zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
    assert(subscriber);

    int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc==0);

    int linger = 0;
    rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: bind failed: %s\n", strerror (errno));
        return -1;
    }

    std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
    while (true) {
        zmq::message_t rx_msg;
        // when timeout (the third argument here) is -1,
        // then block until ready to receive
        std::cout << "Still Listening before poll \n";
        zmq::poll(p.data(), 1, -1);
        std::cout << "Found an item \n"; // not reaching
        if (p[0].revents & ZMQ_POLLIN) {
            // received something on the first (only) socket
            subscriber.recv(&rx_msg);
            std::string rx_str;
            rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
            std::cout << "Received: " << rx_str << std::endl;
        }
    }
    return 0;
}

Этот код будет работать, если я запускаю один экземпляр программы с двумя потоками

    std::thread t_sub(ListenMessage);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(SendMessage str);
    t_pub.join();
    t_sub.join();

Но мне интересно почему при запуске двух экземпляров программы приведенный выше код не будет работать?

Спасибо за помощь!

1 Ответ

3 голосов
/ 08 февраля 2020

В случае, если кто-то никогда не работал с ZeroMQ,
здесь можно сначала посмотреть "ZeroMQ Принципы менее чем за Пять секунд "
, прежде чем углубляться в детали


Q : интересно, почему при запуске двух экземпляров программы приведенный выше код не будет работать?

Этот код никогда не будет летать - и он не имеет ничего общего ни с thread на основе, ни обработка process [CONCURENT].

Это было вызвано неправильным дизайном I nter P rocess C ommunication.

ZeroMQ может предоставить для этого любой из поддерживаемых транспортных классов:
{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// } плюс еще более умный для внутрипроцессных коммуникаций, inproc:// Транспортный класс, готовый к обмену между потоками, где не требующая стека связь может иметь самую низкую из всех задержек, будучи политикой отображения памяти.

Выбор базы L3 / L2 Сетевой стек d для I nter- P rocess- C возможна связь, но это своего рода самый «дорогой» вариант.


Ошибка ядра:

При таком выборе любые отдельные процессы (не говоря о паре процессов) столкнутся при попытке .bind() его AccessPoint на очень же TCP / IP- address:port#


Другой Ошибка:

Даже ради запуска одиночной программы оба порожденных потока пытаются .bind() его приватное AccessPoint , но никто не пытается .connect() соответствующая "противоположность" AccessPoint .

По крайней мере, один должен успешно .bind(), а
по крайней мере один должен успешно .connect(), чтобы получить «канал», здесь PUB/SUB Архетип.


ToDo:

  • решить о правильном, достаточно правильном Тр ansport-Class (лучше избегать излишнего использования полного стека L3 / L2 для локального / внутрипроцессного IP C)
  • рефакторинг управления Address:port# ( для процессов 2+, чтобы не завершиться с ошибкой .bind() - (s) к тому же (аппаратно) address:port#
  • всегда обнаруживать и обрабатывать соответствующим образом возвращенные {PASS|FAIL} -ы из вызовов API
  • всегда устанавливает LINGER в ноль явно (вы никогда не знаете)
...