Почему порядок zmq (inproc: //) -соединения имеет значение, в отличие от (tcp: //)? - PullRequest
0 голосов
/ 10 мая 2018

При запуске сервера и клиента zmq в произвольном порядке, связываясь по транспортному классу tcp://, они достаточно умны, чтобы подключаться / переподключаться независимо от порядка.

Однако при попытке запустить то же самое для транспортного класса inproc:// я вижу, что он работает только в том случае, если клиент запускается после сервера. Как мы можем избежать этого?


MCVE-код:

Вот несколько примеров MCVE-кода kotlin для воспроизведения заявки (это модифицированная версия хорошо известного примера погоды)

server.kt - запустите это, чтобы запустить автономный сервер

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import sandbox.util.Util.sout
import java.util.*

fun main(args: Array<String>) {
   server(
      context = ZMQ.context(1),
//      publishTo = "tcp://localhost:5556"
      publishTo = "tcp://localhost:5557"
   )
}

fun server(context: Context, publishTo: String) {
   val publisher = context.socket(ZMQ.PUB)
   publisher.bind(publishTo)

   //  Initialize random number generator
   val srandom = Random(System.currentTimeMillis())
   while (!Thread.currentThread().isInterrupted) {
      //  Get values that will fool the boss
      val zipcode: Int
      val temperature: Int
      val relhumidity: Int
      zipcode = 10000 + srandom.nextInt(10)
      temperature = srandom.nextInt(215) - 80 + 1
      relhumidity = srandom.nextInt(50) + 10 + 1

      //  Send message to all subscribers
      val update = String.format("%05d %d %d", zipcode, temperature, relhumidity)
      println("server >> $update")
      publisher.send(update, 0)
      Thread.sleep(500)
   }

   publisher.close()
   context.term()
}

client.kt - запустите это для автономного клиента

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import java.util.*

fun main(args: Array<String>) {
   client(
      context = ZMQ.context(1),
      readFrom = "tcp://localhost:5557"
   )
}

fun client(context: Context, readFrom: String) {
   //  Socket to talk to server
   println("Collecting updates from weather server")
   val subscriber = context.socket(ZMQ.SUB)
   //        subscriber.connect("tcp://localhost:");
   subscriber.connect(readFrom)

   //  Subscribe to zipcode, default is NYC, 10001
   subscriber.subscribe("".toByteArray())

   //  Process 100 updates
   var update_nbr: Int
   var total_temp: Long = 0
   update_nbr = 0
   while (update_nbr < 10000) {
      //  Use trim to remove the tailing '0' character
      val string = subscriber.recvStr(0).trim { it <= ' ' }
      println("client << $string")
      val sscanf = StringTokenizer(string, " ")
      val zipcode = Integer.valueOf(sscanf.nextToken())
      val temperature = Integer.valueOf(sscanf.nextToken())
      val relhumidity = Integer.valueOf(sscanf.nextToken())

      total_temp += temperature.toLong()
      update_nbr++

   }
   subscriber.close()
}

inproc.kt - запустите его и измените, какой образец вызывается для сценариев inproc://

package sandbox.zmq

import org.zeromq.ZMQ
import kotlin.concurrent.thread


fun main(args: Array<String>) {
//   clientFirst()
   clientLast()
}

fun println(string: String) {
   System.out.println("${Thread.currentThread().name} : $string")
}

fun clientFirst() {

   val context = ZMQ.context(1)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

fun clientLast() {

   val context = ZMQ.context(1)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

1 Ответ

0 голосов
/ 10 мая 2018

Почему zmq inproc:// порядок подключения имеет значение, в отличие от tcp://?

Ну, это не совсем правильное поведение

Учитывая, что нативный API ZeroMQ предупреждает о подобном поведении (с тех пор), проблема не в проблеме, а в предполагаемом свойстве.

Кроме того, должно быть выполнено еще одно дополнительное свойство:

Имя [означает an_endpoint_name в .connect("inproc://<_an_endpoint_name_>")]
должно быть предварительно создано, назначив его как минимум один сокет
в пределах того же ØMQ контекста, что и подключаемый сокет.

Более новые версии нативного API ZeroMQ (версия 4.0), , если они действительно развернуты под соответствующей языковой привязкой / оболочкой, может позволить освободить первое из следующих требований:

Начиная с версии 4.0, порядок zmq_bind() и zmq_connect() не имеет значения, как для типа транспорта tcp.


Как мы можем избежать этого?

Ну, намного сложнее ...

Если вы еще не получили легкий путь по сравнению с собственным API ZeroMQ v4.2 +, можно закатать рукава и перефакторировать оболочку / привязку языка pre-4.x, чтобы заставить движок попасть туда или, может быть, проверить, может ли второй милый ребенок Мартина СУСТРИКА, nanomsg соответствовать сцене для достижения этой цели.

...