У меня проблема с производительностью при чтении данных из HBase в потоковой передаче искры. Это займет более 5 минут только для чтения данных из HBase для 3 записей. Ниже приведена логика, которую я использовал в mapPartitions.
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)
messages.mapPartitions(iter => {
val context = TaskContext.get
logger.info((s"Process for partition: ${context.partitionId} "))
val hbaseConf = HBaseConfiguration.create()
//hbaseConf.addResource(new File("/etc/hbase/conf/hbase-site.xml").toURI.toURL)
//val connection: Connection = hbaseConnection.getOrCreateConnection(hbaseConf)
val connection = ConnectionFactory.createConnection(hbaseConf)
val hbaseTable = connection.getTable(TableName.valueOf("prod:CustomerData"))
.......
})
Я использовал BulkGet. Обработка 90K-сообщений занимает около 5 секунд (возможно, потому, что API использует HBaseContext, а нам не нужно создавать HBaseConnection). Но я не могу использовать это, поскольку вывод BulkGet - RDD, и я должен сделать leftouterjoin, чтобы присоединиться к RDD BulkGet с фактическим RDD от Kafka. Я предполагаю, что это не правильный подход, поскольку он включает в себя следующее. Более того, мне нужно обработать все 90K-сообщений за 1 секунду.
Извлечь отдельный идентификатор Cusotmer из RDD, считанного с Kafka, перед передачей его в BulkGet
Кроме того, это включает в себя тасование, так как я должен сделать leftOuterJoin в основном СДР (от Kafka) с СДР BulkGet (я вижу только возможность соединения, поскольку выход BulkGet является СДР)
Может кто-нибудь помочь мне, в чем проблема с производительностью, когда я пытаюсь создать HBaseConnection в mapPartitions. Я также попытался установить путь к классу драйвера.
Спасибо