Как я могу отправить сообщение ZeroMQ из сокета ROUTER в указанный сокет DEALER c, используя cppzmq? - PullRequest
2 голосов
/ 24 февраля 2020

Я собрал этот минимальный пример, чтобы отправить сообщение из сокета Маршрутизатора на указанный сокер c ДИЛЕР (в котором установлен его идентификатор). При запуске этих двух программ кажется, что он зависает на ROUTER в ожидании ответа от DEALER , а DEALER зависает в ожидании запроса от роутер . Похоже, что сообщение, которое отправляет ROUTER , никогда не отправляется на ДИЛЕР .

Маршрутизатор. cpp

#include <iostream>
#include <zmq.hpp>
#include <string>
#include <thread>
#include <chrono>

int main() {
    zmq::context_t context;
    zmq::socket_t socket (context, zmq::socket_type::router);
    // Enforce sending routable messages only
    socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1);
    socket.bind("tcp://*:5555");

    try {
        std::string jobRequest = "ExampleJobRequest";

        std::cout << "Router: Sending msg: " << jobRequest << std::endl;

        // Set the address, then the empty delimiter and then the request itself
        socket.send("PEER2", ZMQ_SNDMORE);
        //socket.send(zmq::message_t(), ZMQ_SNDMORE);
        socket.send(zmq::str_buffer("ExampleJobRequest")) ;

        // Set the address, then the empty delimiter and then the request itself
        socket.send("PEER2", ZMQ_SNDMORE);
        //socket.send(zmq::message_t(), ZMQ_SNDMORE);
        socket.send(zmq::str_buffer("ExampleJobRequest")) ;

        // Receive the reply from the camera
        std::cout << "Router: Waiting for reply from camera " << std::endl;
        zmq::message_t reply;
        socket.recv(&reply);

        std::cout << "Router: Received " <<  std::string(static_cast<char*>(reply.data()), reply.size()) << std::endl;
    } catch (std::exception e) {
        std::cout << "Router Error: " << e.what();
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));
    socket.close();
    context.close();
}

Дилер. cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <thread>

int main (void)
{
    //  Prepare our context and socket
    zmq::context_t context;
    zmq::socket_t socket (context, zmq::socket_type::dealer);

    std::cout << "Dealer: Connecting to RunJob server… \n";
    socket.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
    socket.connect ("tcp://localhost:5555");

    while(true) {
        try {
            // Wait for next request from client
            std::cout << "Dealer: Waiting for request" << std::endl;
            zmq::message_t request;
            zmq::message_t empty;

            // Receive request
            socket.recv(&request);

            std::string requestString = std::string(static_cast<char*>(request.data()), request.size());

            std::cout << "Dealer: Received request" << std::endl;
            std::cout << requestString << std::endl;

            // ZMQ_SNDMORE - "Specifies that the message being sent is a multi-part message, and that further message parts are to follow"
            socket.send(zmq::str_buffer("Job completed"), zmq::send_flags::dontwait);
        }catch (std::exception e) {
            std::cout << "Router Error: " << e.what();
        }
    }

    // Used to set various 0MQ Socket Settings
    // ZMQ_Linger - Set linger period for socket shutdown
    socket.setsockopt(ZMQ_LINGER, 0);
    socket.close();
    context.close();

    return 0;
}

Первоначально я полагал, что мне следует добавлять в сообщение пустой разделитель, socket.send(zmq::message_t(), ZMQ_SNDMORE);, но это вызвало ошибка. Также использование следующего также привело к ошибке в блоке try / catch. Ошибка просто печатает «Неизвестная ошибка»:

zmq::message_t delimiter(0);
socket.send(delimiter, ZMQ_SNDMORE);

Использование следующего для создания разделителя также вызывает ту же ошибку:

socket.send(zmq::message_t(), ZMQ_SNDMORE);

Насколько мне известно, при использовании cppzmq вы не ' Мне нужно добавить пустой разделитель (я могу ошибаться в этом, но после прочтения и просмотра примера других людей и проверки собственного кода, это то, что я определил).

Вот очень базовый c диаграмма с конечной целью этого проекта:

ROUTER to DEALER messaging design

В моем исследовании я не нашел хорошего примера этого кода. У Cppzmq github очень мало документации и мало примеров.

Вот некоторые другие источники, на которые я смотрел:

1 Ответ

2 голосов
/ 25 февраля 2020

Основная идея шаблона ROUTER / DEALER заключается в том, что это асинхронное обобщение REPLY / REQUEST. Тем не менее, вы пытаетесь изменить сокеты в вашем шаблоне, обнаруживая, что он не подходит, и искажая код, чтобы попытаться сделать его подходящим. Не делайте этого.

Что вам нужно сделать, это "go с потоком". В простом методе, для которого существуют примеры, ДИЛЕР должен отправить первое сообщение. Затем маршрутизатор отвечает на это.

Следующим уровнем является ДИЛЕР, чтобы идентифицировать себя в своем сообщении при запуске. Маршрутизатор может затем дать конкретный c ответ этому ДИЛЕРУ.

На следующем уровне вы можете go по-настоящему асинхронным. МАРШРУТИЗАТОР может взять копию идентификационного сообщения каждого ДИЛЕРА и использовать копии сообщения для отправки асинхронных сообщений любому ДИЛЕРУ в любое время. К одной копии идентификационного сообщения будет добавлен кадр «PEER2», который будет отправлен ДИЛЕРУ. Это работает, потому что копии сообщений включают фреймы маршрутизации. В идеале вы также должны удалить кадры «сообщения», чтобы в копии оставались только кадры маршрутизации.

Предостережение - я не использую cppzmq, я использую CZMQ. Я могу сказать, что с помощью CZMQ манипулирование кадрами такого рода очень простое.

...