Топология ZeroMQ PUB / SUB на одном компьютере - PullRequest
2 голосов
/ 06 февраля 2020

Может ли издатель опубликовать sh на нескольких клиентах на одном компьютере с помощью ZeroMQ? Мне нужен набор клиентов, каждый из которых может выполнять стандартные вызовы Запрос / Ответ с использованием SocketType. REQ и SocketType. REP , но которые также могут получать уведомления с помощью SocketType. SUB и SocketType. PUB .

Я пытался реализовать эту топологию, взятую из здесь , хотя в моей версии только один издатель.

enter image description here

Вот мой издатель:

public class ZMQServerSmall 
{   
    public static void main(String[] args)
    {
        try (ZContext context = new ZContext()) 
        {           
            ZMQ.Socket rep = context.createSocket(SocketType.REP);
            rep.bind("tcp://*:5555");

            ZMQ.Socket pub = context.createSocket(SocketType.PUB);
            pub.bind("tcp://*:7777");           

            while (!Thread.currentThread().isInterrupted()) 
            {                   
                String req = rep.recvStr(0);
                rep.send(req + " response");

                pub.sendMore("Message header");
                pub.send("Message body");;          
            }
        }
    }
}

Вот мой прокси (я включил Слушатель, чтобы попытаться увидеть, что происходит on):

public class ZMQForwarderSmall 
{
    public static void main(String[] args) 
    {       
        try 
        (
            ZContext context = new ZContext();   
        )
        {
            ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);            
            frontend.connect("tcp://*:7777");

            ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
            backend.bind("tcp://*:6666");

            IAttachedRunnable runnable = new Listener();
            Socket listener = ZThread.fork(context, runnable);

            ZMQ.proxy(frontend, backend, listener);
        }
        catch (Exception e) 
        {
            System.err.println(e.getMessage());
        } 
    }

    private static class Listener implements IAttachedRunnable 
    {    
        @Override
        public void run(Object[] args, ZContext ctx, Socket pipe) 
        {
            while (true) 
            {
                ZFrame frame = ZFrame.recvFrame(pipe);
                if (frame == null)
                    break; // Interrupted
                System.out.println(frame.toString());
                frame.destroy();
            }
        }
    }
}

Вот мой подписчик:

public class ZMQClientSmall
{   
    public static void main(String[] args) throws IOException
    {
        String input;

        try 
        (
            ZContext context = new ZContext();
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
        ) 
        { 
            ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
            reqSocket.connect("tcp://localhost:5555");

            ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
            subSocket.connect("tcp://localhost:6666");

            subSocket.subscribe("".getBytes(ZMQ.CHARSET));

            while ((input = stdIn.readLine()) != null)
            {
                reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
                String response = reqSocket.recvStr(0);

                String address = subSocket.recvStr(ZMQ.DONTWAIT);
                String contents = subSocket.recvStr(ZMQ.DONTWAIT);
                System.out.println("Notification received: " + address + " : " + contents);
            }
        }   
    }   
}

Вот тест. Я открываю четыре терминала; 1 издатель, 1 прокси и 2 клиента. Когда я делаю запрос в любом из двух клиентских терминалов, я ожидаю увидеть уведомление в обоих, но вместо этого я вижу только уведомление в терминале, который сделал запрос. Я знаю, что оба клиента используют один и тот же адрес (localhost: 6666), но я надеялся, что прокси решит эту проблему.

Кто-нибудь может увидеть здесь что-то явно не так?

1 Ответ

1 голос
/ 06 февраля 2020

Q : Возможно ли, чтобы издатель опубликовал sh для нескольких клиентов на одном компьютере с помощью ZeroMQ?

Ох конечно , это так. В этом нет сомнений.


Проверьте код. Ответственность за выполнение заказа есть. В это всегда так.

Как только [Client]-No1 экземпляр получает правдоподобный .readLine() -ed input это будет прыгать:

        while ((input = stdIn.readLine()) != null)
        {
            reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
            String response = reqSocket.recvStr(0);

            String address = subSocket.recvStr(ZMQ.DONTWAIT);
            String contents = subSocket.recvStr(ZMQ.DONTWAIT);
            System.out.println(           "Notification received: "
                              + address + " : "
                              + contents
                                );
        }

Далее это .send() -s над REQ и блоками (в ожидании ответа REP)

Учитывая [Client]-No2 экземпляр также получает правдоподобное руководство .readLine() -ed input оно будет вставлять то же самое while(){...}, но не будет продвигаться дальше, чем к повторной блокировке ожидания REP -ответа , Он не получит .recv() в любое время, но после того, как -No1 будет получен со стороны REP, так что в то время как -No1 мог выйти из блокировки- .recv() , не так -No2 (который все еще будет зависать внутри своего блокирующего- .recv() для любого следующего ответа REP (который может прийти, но не обязательно), в то время как No1 уже перешел к PUB/SUB-.recv(), который он получит (но никогда No2), следующий в следующий раз блокировка - input - подача от .readLine() Et Cetera, Et Cetera, Et Cetera, ..., Ad Infinitum

Итак, эти SEQ -of-In-L oop (REQ) - части, за которыми следует (SUB) - части в любом количестве N > 1 из [Client] - экземпляры, эффективно сгенерировавшие ИСКЛЮЧИТЕЛЬНЫЙ Tick-Tock-Tick-Tock-механизм , взаимно блокирующий эксклюзивную доставку PUB -ed в N -перемеженном порядке (не говоря о руководстве, .readLine() -привод, шаг блокировки)

ZMQServerSmall не знает ничего плохого, так как он .send() -с для любого .recvStr() -ed контрагент по REQ/REP и PUB -s всем контрагентам (которые не читают автономно, но только после того, как были вручную .readLine() разблокированы и только затем (после REQ/REP эпизодов c (потенциально бесконечно) заблокировано) шаги) может .recv() его следующий (пока что не прочитанная часть сообщения (пока, я не вижу кода, который работает с явной обработкой наличия / отсутствия флагов multipart на стороне SUB *) 1108 * операций)

        while (!Thread.currentThread().isInterrupted()) 
        {                   
            String req = rep.recvStr(0);
            rep.send(req + " response");

            pub.sendMore("Message header");
            pub.send("Message body");;          
        }

Тем временем ZMQServerSmall отправляет (N - 1) в разы больше сообщений по PUB -передаче, поэтому Tick-Tock -Tick-Tock MUTEX REQ/SUB -l oop -блокирующий "маятник" не является 2-мя состояниями, но N -State на принимающих сторонах (все получают одинаковый поток PUB сообщения,
пока с перемежением N-шагами REQ/REP MUTEX- степпинга * 11 27 *)

...