Можно ли использовать транспорт ZeroMQ (NetMQ) TCP между издателем и подписчиком в одном и том же процессе? - PullRequest
0 голосов
/ 23 мая 2018

Я думаю, что вопрос говорит сам за себя.

Для некоторой предыстории у меня есть подписчик, для которого я пытаюсь написать несколько тестов.Чтобы сделать это, я раскручиваю издателя из теста, указывая tcp://localhost:[port] в качестве адреса.Когда сообщение отправлено, подписчик не получает его.Вот пример кода для демонстрации:

   string address   =    "tcp://localhost:1026";
// string address   = "inproc://localhost1026";

   var    pubSocket =  new PublisherSocket();
   pubSocket.Bind(address);

   var subSocket = new SubscriberSocket();
   subSocket.Connect(address);
   subSocket.SubscribeToAnyTopic();

   pubSocket.SendFrame("Hello world!", false);

   Console.WriteLine(subSocket.ReceiveFrameString()); /* <-- tcp transport 
                                                         waits here forever
                                                         */
   subSocket.Dispose();
   pubSocket.Dispose();

Если я изменю протокол на inproc://, тогда все хорошо.Однако я не хочу делать это в своих тестах, потому что я также хочу протестировать сокет монитора, и это не вызывает события для соединений inproc:// (насколько я вижу).

Обратите внимание, что я использую NetMQ из кода C # (работает под .NET Framework 4.6.2).

Ответы [ 2 ]

0 голосов
/ 23 мая 2018

O / P заслуживает +1 для публикации номеров версий используемого .NET.Отлично, сэр.

Можно ли использовать транспорт TCP ZeroMQ (NetMQ) между издателем и подписчиком в одном процессе?

Абсолютно,
пока нет никаких ограничений, мешающих этому.,.

Ваше наблюдение связано с задержкой скрытой обработки, которая происходит внутри основного механизма ... внутри экземпляра ZeroMQ Context().

Вещи не происходят в нулевое время , извините, нет (даже (почти) -кулярных магов, таких как Большой Взрыв, действительно потребовалось некоторое время для создания экземпляранаша подлинная Вселенная ... лучше всего прочитать полностью детективную книгу об этом, опубликованную Стивеном Вайнбергом: ПЕРВЫЕ ТРИ МИНУТЫ, Современный взгляд на происхождение Вселенной - обязательно прочитайте, чтобы упасть вlove with.

Что ж, можно отложить это сразу после Pieter HINTJENS, " Code Connected, Volume 1 " на ZeroMQ Zen-of-Zero. И то, и другое имеет смысл, большой смысл, и не только здесь).

Итак, в то время как inproc:// транспортный класс имеет (почти) -zero-resources, являясь чистой приватной абстракцией, отображаемой в памяти (в процессе) (конечно, за исключением нескольких sub- [ns] - «устройства», такие как семафор / блокировка), инфраструктура ZeroMQ запускается и работает «немедленным» образом, tcp:// не имеетэто удобство, так как он должен сначала генерировать все контракты, специфичные для транспортного класса, с O / S, с драйвером (ами) устройства, создавать специфичные для транспортного класса специфические политики обработки ISO / OSI- {L0, L1, L3, L3 +}создать соответствующий код перекачки данных в RTO-состоянии Context(), выделить и отобразить буфер (ы) памяти для этих целей, так что довольно много работы предстоит сделать перед PUB сторона получает в состояние RTO, где она (в более новых версиях ~ API 4. +) также обязана как , так и обрабатывать услугу телеметрии подписки, поскольку она несет сосредоточенную ответственностьдля обработки списка фильтров TOPIC для клиента SUB.

Это , почему это привело к зависанию .recv( ..., ZMQ_BLOCK ), похороненного внутриNetMQ завернутая абстракция subSocket.ReceiveFrameString().

Для тестированияпросто сделайте модифицированный тест:

  // --------------------------------------------------- // DEMO PSEUDO-CODE
     string rF = "";

     while True:

           pubSocket.SendFrame( "Hello world!", false ); // keep sending ...
                                                         // also may count++
                                                         // so as to "show" how
                                                         // many loops it took
           rF = subSocket.ReceiveFrameString(   false ); // non-blocking mode ~
                                                         // .recv(  ZMQ_NOBLOCK )
                                                         // or may use Poll()
                                                         // to just sniff for
                                                         // a message presence
           if  rF == "":
               continue; // ---^ LOOP NEXT, AS DID .recv() NOTHING YET
           break; // ----------v BREAK      AS DID .recv() MESSAGE
// ----------------------------------------------------------------------
   Console.WriteLine( rF );

Здесь можно приложить больше усилий и поэкспериментировать, чтобы увидеть ключевые роли как накладных расходов, связанных с .connect(), но и необходимостьне пропустить сигнальную телеметрию на стороне SUB при настройке подписки (подписок) + необходимость ее получения + повторную обработку на стороне PUB перед любым сообщениемпервый в истории отправляется к намеченному, иначе просто « навсегда » - ожидание, SUB


«толстый» - достаточно .sleep( someGuestimateTIME ), предложенный уже @ HesamFaridmehr , после того, как сторона SUB имеет оба .connect() -ed плюс плюс .setsockopt( ZMQ_SUBSCRIBE ) (который должен получитьсначала доставлены, а также должным образом обработаны на стороне PUB (для правильной настройки процессора процессора списка TOPIC-фильтров)) все это достаточно хорошо перед первым PUB.send() будет маскировать основную причину , делая ее «косвенно» заблокированной, и выполнение кода прекращается, вместо того, чтобы сделать решение достаточно умным - используя неблокирующую форму, например, Poll() - для профессиональной передовой опыт проектирования,где можно действительно повиноваться сборке хакеров любимой первой строчке настоящего макроса #ASSUME NOTHING;.

0 голосов
/ 23 мая 2018

Потому что pub/sub шаблон похож на радио.Издатель не будет ждать, пока подписчик подключится, он будет игнорировать отправку, если подписчик отсутствует.Вы можете проверить это, просто добавив Thread.Sleep(1000); после subSocket.SubscribeToAnyTopic(); строки и увидите, что получите сообщение.

В inproc время для привязки меньше tcp, поэтому вы получаетесообщение

А в inproc, издатель должен быть включен до того, как подписчик подключится

...