При подключении к HBase из mapPartitions в потоковой передаче с плавающей точкой возникает следующая проблема. Но нет никаких проблем, когда я использовал HBaseContext. Может кто-нибудь, пожалуйста, помогите мне решить эту проблему.
Caused by: java.io.IOException: hconnection-0x3e1ab781 closed
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveZooKeeperWatcher(ConnectionManager.java:1806)
at org.apache.hadoop.hbase.client.ZooKeeperRegistry.isTableOnlineState(ZooKeeperRegistry.java:122)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.isTableDisabled(ConnectionManager.java:993)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.relocateRegion(ConnectionManager.java:1162)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.relocateRegion(ConnectionManager.java:1150)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:971)
at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:83)
at org.apache.hadoop.hbase.client.RegionServerCallable.prepare(RegionServerCallable.java:79)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:124)
... 17 more
18/11/14 11:59:40 INFO client.RpcRetryingCaller: Call exception, tries=34, retries=35, started=592797 ms ago, cancelled=false, msg=row on table 'prod:CustomerData' at null
18/11/14 11:59:40 ERROR executor.Executor: Exception in task 6.1 in stage 1.0 (TID 30)
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=35, exceptions:
Ниже приведен код, который я использовал
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)
messages.mapPartitions(iter => {
val context = TaskContext.get
logger.info((s"Process for partition: ${context.partitionId} "))
val hbaseConf = HBaseConfiguration.create()
//hbaseConf.addResource(new File("/etc/hbase/conf/hbase-site.xml").toURI.toURL)
//val connection: Connection = hbaseConnection.getOrCreateConnection(hbaseConf)
val connection = ConnectionFactory.createConnection(hbaseConf)
val hbaseTable = connection.getTable(TableName.valueOf("prod:CustomerData"))
.......
})