Spark-Streaming широковещательная переменная для пользовательского получателя - PullRequest
0 голосов
/ 15 мая 2019

Я создал приложение, которое использует Spark-Streaming с пользовательским приемником Google Pub / Sub.

Я достиг своего предела производительности и заинтересован в том, чтобы оставлять сообщения без обработки. У меня была идея store() суб чтения сообщений Я использовал apache / bahir приемник

                val pullResponse = client.projects().subscriptions().pull(subscriptionFullName, pullRequest).execute()
                val receivedMessages = pullResponse.getReceivedMessages.asScala.toList
                Utils.LOG.info(s"receivedMessages from PUB/SUB ${receivedMessages.size}")
                rateLimiter.acquire(receivedMessages.size)
                var factor: Int = 0
                if (dropFactorBroad != null) {
                    factor = dropFactorBroad.value
                } else {
                    Utils.LOG.info("dropFactorBroad is null")
                }
                val endIndex = if (factor > receivedMessages.length) receivedMessages.length else factor
                val messagesToStore = receivedMessages.slice(0, receivedMessages.length - endIndex)

                store(messagesToStore.map(x => {
                      val sm = new SparkPubsubMessage
                      sm.message = x.getMessage
                      sm
                  })
                  .iterator)

                val ackRequest = new AcknowledgeRequest()
                ackRequest.setAckIds(receivedMessages.map(x => x.getAckId).asJava)
                client.projects().subscriptions().acknowledge(subscriptionFullName, ackRequest).execute()

dropFactorBroad - переменная широковещания, которая обновляется каждые onBatchCompleted (без повторных проверок и создается снова)

Это не работает, я получаю

java.lang.NullPointerException
    at com.mag.ingester.ReceiverDropFactorBroadcaster.value(ReceiverDropFactorBroadcaster.scala:20)
    at com.mag.pubSubReceiver.PubsubReceiver.receive(PubsubInputDStream.scala:260)
    at com.mag.pubSubReceiver.PubsubReceiver$$anon$1.run(PubsubInputDStream.scala:244)

ReceiverDropFactorBroadcaster является dropFactorBroad

Как я могу управлять магазином приема? Должен ли я убить получателей, изменить переменную и запустить ее заново? (Как это можно сделать?)

Спасибо

...