Я создал приложение, которое использует Spark-Streaming с пользовательским приемником Google Pub / Sub.
Я достиг своего предела производительности и заинтересован в том, чтобы оставлять сообщения без обработки. У меня была идея store()
суб чтения сообщений
Я использовал apache / bahir приемник
val pullResponse = client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute()
val receivedMessages = pullResponse.getReceivedMessages.asScala.toList
Utils.LOG.info(s"receivedMessages from PUB/SUB ${receivedMessages.size}")
rateLimiter.acquire(receivedMessages.size)
var factor: Int = 0
if (dropFactorBroad != null) {
factor = dropFactorBroad.value
} else {
Utils.LOG.info("dropFactorBroad is null")
}
val endIndex = if (factor > receivedMessages.length) receivedMessages.length else factor
val messagesToStore = receivedMessages.slice(0, receivedMessages.length - endIndex)
store(messagesToStore.map(x => {
val sm = new SparkPubsubMessage
sm.message = x.getMessage
sm
})
.iterator)
val ackRequest = new AcknowledgeRequest()
ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute()
dropFactorBroad
- переменная широковещания, которая обновляется каждые onBatchCompleted
(без повторных проверок и создается снова)
Это не работает, я получаю
java.lang.NullPointerException
at com.mag.ingester.ReceiverDropFactorBroadcaster.value(ReceiverDropFactorBroadcaster.scala:20)
at com.mag.pubSubReceiver.PubsubReceiver.receive(PubsubInputDStream.scala:260)
at com.mag.pubSubReceiver.PubsubReceiver$$anon$1.run(PubsubInputDStream.scala:244)
ReceiverDropFactorBroadcaster
является dropFactorBroad
Как я могу управлять магазином приема?
Должен ли я убить получателей, изменить переменную и запустить ее заново? (Как это можно сделать?)
Спасибо