Я постоянно печатаю эти строки на стандартный вывод при использовании jeromq, это довольно раздражает. Кажется, он распечатывается с тем же интервалом, что и при вызове Zmsg.recvMsg()
, что составляет каждую 1 миллисекунду в моей реализации ниже. Я использую jeromq 0.5.1
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
zmq.poll.Poller@e760759 rebuilding selector
zmq.poll.Poller@17b0943e rebuilding selector
Вот реализация подписчика ZMQ, который я использую, который использует jeromq для поднятия тяжестей, интересные методы здесь start()
и stop
class ZMQSubscriber(
socket: InetSocketAddress,
hashTxListener: Option[ByteVector => Unit],
hashBlockListener: Option[ByteVector => Unit],
rawTxListener: Option[ByteVector => Unit],
rawBlockListener: Option[ByteVector => Unit]) {
private val logger = BitcoinSLogger.logger
private var running = true
private val context = ZMQ.context(1)
private val subscriber = context.socket(ZMQ.SUB)
private val uri = socket.getHostString + ":" + socket.getPort
private case object SubscriberRunnable extends Runnable {
override def run(): Unit = {
val isConnected = subscriber.connect(uri)
if (isConnected) {
hashTxListener.map { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
}
rawTxListener.map { _ =>
subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw transactions from zmq")
}
hashBlockListener.map { _ =>
subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the hashblock stream from zmq")
}
rawBlockListener.map { _ =>
subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw block stream from zmq")
}
while (running) {
val zmsg = ZMsg.recvMsg(subscriber, ZMQ.NOBLOCK)
if (zmsg != null) {
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
processMsg(notificationTypeStr, body)
} else {
Thread.sleep(1)
}
}
} else {
logger.error(s"Failed to connect to zmq socket ${uri}")
throw new RuntimeException(s"Failed to connect to zmq socket ${uri}")
}
}
}
private val subscriberThread = new Thread(SubscriberRunnable)
subscriberThread.setName("ZMQSubscriber-thread")
subscriberThread.setDaemon(true)
def start(): Unit = {
logger.info("starting zmq")
subscriberThread.start()
}
/**
* Stops running the zmq subscriber and cleans up after zmq
* http://zguide.zeromq.org/java:psenvsub
*/
def stop: Unit = {
//i think this could technically not work, because currently we are blocking
//on Zmsg.recvMsg in our while loop. If we don't get another message we won't
//be able toe evaluate the while loop again. Moving forward with this for now.
running = false
subscriber.close()
context.term()
}
/**
* Processes a message that we received the from the cryptocurrency daemon and then
* applies the appropriate listener to that message.
*/
private def processMsg(topic: String, body: Array[Byte]): Unit = {
val notification = ZMQNotification.fromString(topic)
notification.foreach {
case HashTx =>
hashTxListener.foreach { f =>
f(ByteVector(body))
}
case RawTx =>
rawTxListener.foreach { f =>
f(ByteVector(body))
}
case HashBlock =>
hashBlockListener.foreach { f =>
f(ByteVector(body))
}
case RawBlock =>
rawBlockListener.foreach { f =>
f(ByteVector(body))
}
}
}
}
Как мне избавиться от надоедливых отпечатков на стандартный вывод для
zmq.poll.Poller@17b0943e rebuilding selector
Вот реальная реализация на github, если вам интересно: https://github.com/bitcoin-s/bitcoin-s-core/blob/master/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala