Проблема при подключении к HBase из Spark Streaming - PullRequest
0 голосов
/ 14 ноября 2018

При подключении к HBase из mapPartitions в потоковой передаче с плавающей точкой возникает следующая проблема. Но нет никаких проблем, когда я использовал HBaseContext. Может кто-нибудь, пожалуйста, помогите мне решить эту проблему.

Caused by: java.io.IOException: hconnection-0x3e1ab781 closed
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveZooKeeperWatcher(ConnectionManager.java:1806)
    at org.apache.hadoop.hbase.client.ZooKeeperRegistry.isTableOnlineState(ZooKeeperRegistry.java:122)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.isTableDisabled(ConnectionManager.java:993)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.relocateRegion(ConnectionManager.java:1162)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.relocateRegion(ConnectionManager.java:1150)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:971)
    at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:83)
    at org.apache.hadoop.hbase.client.RegionServerCallable.prepare(RegionServerCallable.java:79)
    at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:124)
    ... 17 more
18/11/14 11:59:40 INFO client.RpcRetryingCaller: Call exception, tries=34, retries=35, started=592797 ms ago, cancelled=false, msg=row on table 'prod:CustomerData' at null
18/11/14 11:59:40 ERROR executor.Executor: Exception in task 6.1 in stage 1.0 (TID 30)
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=35, exceptions:

Ниже приведен код, который я использовал

val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)
messages.mapPartitions(iter => {

   val context = TaskContext.get
   logger.info((s"Process for partition: ${context.partitionId} "))

   val hbaseConf = HBaseConfiguration.create()
    //hbaseConf.addResource(new File("/etc/hbase/conf/hbase-site.xml").toURI.toURL)    
   //val connection: Connection = hbaseConnection.getOrCreateConnection(hbaseConf)
   val connection =  ConnectionFactory.createConnection(hbaseConf)
   val hbaseTable = connection.getTable(TableName.valueOf("prod:CustomerData"))

   .......
   })
...