Как получить определенный столбец HBase, используя API HBaseContext.BulkGet в Spark Streaming - PullRequest
0 голосов
/ 14 января 2019

Я использую HBaseContext.BulkGet API в потоковой передаче. Ниже мой код.

    def enrichMessage(rdd: RDD[(String, SubscriberDetails)]): RDD[(String, SubscriberDetails)]  = {

       def processResult(key: String, message: SubscriberDetails, result : Result): (String, SubscriberDetails) ={

          var manufacturer: String = "Y"
          var model: String = "Y"
          val resultRow: Array[Byte] = result.getRow

          if (resultRow != null) {
            val cells = result.rawCells()

            //Get the values from HBase result
            for(cell <- cells){
              if(Bytes.toString(CellUtil.cloneQualifier(cell)).equalsIgnoreCase("manufacturer")) {
                manufacturer = Bytes.toString(CellUtil.cloneValue(cell))
              }
              if(Bytes.toString(CellUtil.cloneQualifier(cell)).equalsIgnoreCase("model")) {
                model = Bytes.toString(CellUtil.cloneValue(cell))
              }
            }
            //Enrich message with the fetched values
            message.manufacturer = manufacturer
            message.model = model
          }
          (key, message)
        }

    var mssg : SubscriberDetails = null
    var key: String = null

    val enrichedRDD: RDD[(String, SubscriberDetails)] = hbaseContext.bulkGet[(String, SubscriberDetails), (String, SubscriberDetails)](
      TableName.valueOf("prod:customer"),
      1000,
      rdd,      //input RDD
      (record => {
        key = record._1
        mssg = record._2
        new Get(Bytes.toBytes(mssg.imsi))
      }),
      (result: Result) => processResult(key, mssg, result)
    )

    enrichedRDD
  }

Вышеуказанный подход имеет огромную проблему с производительностью. Извлечение записей 70 КБ занимает около 30 секунд (эти записи 70 КБ будут получены в течение 1 секунды как часть потоковой передачи). У меня есть 2 вопроса.

  1. Не могли бы вы дать мне знать, как улучшить производительность.
  2. Есть ли способ извлечь определенные столбцы из HBase. Я попробовал ниже вариант. Но это не сработало.
hbaseConfig.set(TableInputFormat.INPUT_TABLE, schemdto.tableName);
hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "fname:column1 fname:column2");

Спасибо.

...