Jeromq zmq.poll.Poller@17b0943e восстановление селектора - PullRequest
0 голосов
/ 17 апреля 2019

Я постоянно печатаю эти строки на стандартный вывод при использовании jeromq, это довольно раздражает. Кажется, он распечатывается с тем же интервалом, что и при вызове Zmsg.recvMsg(), что составляет каждую 1 миллисекунду в моей реализации ниже. Я использую jeromq 0.5.1

zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector

Вот реализация подписчика ZMQ, который я использую, который использует jeromq для поднятия тяжестей, интересные методы здесь start() и stop

class ZMQSubscriber(
    socket: InetSocketAddress,
    hashTxListener: Option[ByteVector => Unit],
    hashBlockListener: Option[ByteVector => Unit],
    rawTxListener: Option[ByteVector => Unit],
    rawBlockListener: Option[ByteVector => Unit]) {
  private val logger = BitcoinSLogger.logger

  private var running = true
  private val context = ZMQ.context(1)

  private val subscriber = context.socket(ZMQ.SUB)
  private val uri = socket.getHostString + ":" + socket.getPort

  private case object SubscriberRunnable extends Runnable {
    override def run(): Unit = {

      val isConnected = subscriber.connect(uri)

      if (isConnected) {
        hashTxListener.map { _ =>
          subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
          logger.debug("subscribed to the transaction hashes from zmq")
        }

        rawTxListener.map { _ =>
          subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
          logger.debug("subscribed to raw transactions from zmq")
        }

        hashBlockListener.map { _ =>
          subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
          logger.debug("subscribed to the hashblock stream from zmq")
        }

        rawBlockListener.map { _ =>
          subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
          logger.debug("subscribed to raw block stream from zmq")
        }

        while (running) {
          val zmsg = ZMsg.recvMsg(subscriber, ZMQ.NOBLOCK)
          if (zmsg != null) {
            val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
            val body = zmsg.pop().getData
            processMsg(notificationTypeStr, body)
          } else {
            Thread.sleep(1)
          }
        }
      } else {
        logger.error(s"Failed to connect to zmq socket ${uri}")
        throw new RuntimeException(s"Failed to connect to zmq socket ${uri}")
      }

    }
  }

  private val subscriberThread = new Thread(SubscriberRunnable)
  subscriberThread.setName("ZMQSubscriber-thread")
  subscriberThread.setDaemon(true)

  def start(): Unit = {
    logger.info("starting zmq")
    subscriberThread.start()
  }

  /**
    * Stops running the zmq subscriber and cleans up after zmq
    * http://zguide.zeromq.org/java:psenvsub
    */
  def stop: Unit = {
    //i think this could technically not work, because currently we are blocking
    //on Zmsg.recvMsg in our while loop. If we don't get another message we won't
    //be able toe evaluate the while loop again. Moving forward with this for now.
    running = false
    subscriber.close()
    context.term()
  }

  /**
    * Processes a message that we received the from the cryptocurrency daemon and then
    * applies the appropriate listener to that message.
    */
  private def processMsg(topic: String, body: Array[Byte]): Unit = {
    val notification = ZMQNotification.fromString(topic)
    notification.foreach {
      case HashTx =>
        hashTxListener.foreach { f =>
          f(ByteVector(body))
        }
      case RawTx =>
        rawTxListener.foreach { f =>
          f(ByteVector(body))
        }
      case HashBlock =>
        hashBlockListener.foreach { f =>
          f(ByteVector(body))
        }
      case RawBlock =>
        rawBlockListener.foreach { f =>
          f(ByteVector(body))
        }
    }

  }
}

Как мне избавиться от надоедливых отпечатков на стандартный вывод для

zmq.poll.Poller@17b0943e rebuilding selector

Вот реальная реализация на github, если вам интересно: https://github.com/bitcoin-s/bitcoin-s-core/blob/master/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala

...