Я использую HBaseContext.BulkGet API в потоковой передаче. Ниже мой код.
def enrichMessage(rdd: RDD[(String, SubscriberDetails)]): RDD[(String, SubscriberDetails)] = {
def processResult(key: String, message: SubscriberDetails, result : Result): (String, SubscriberDetails) ={
var manufacturer: String = "Y"
var model: String = "Y"
val resultRow: Array[Byte] = result.getRow
if (resultRow != null) {
val cells = result.rawCells()
//Get the values from HBase result
for(cell <- cells){
if(Bytes.toString(CellUtil.cloneQualifier(cell)).equalsIgnoreCase("manufacturer")) {
manufacturer = Bytes.toString(CellUtil.cloneValue(cell))
}
if(Bytes.toString(CellUtil.cloneQualifier(cell)).equalsIgnoreCase("model")) {
model = Bytes.toString(CellUtil.cloneValue(cell))
}
}
//Enrich message with the fetched values
message.manufacturer = manufacturer
message.model = model
}
(key, message)
}
var mssg : SubscriberDetails = null
var key: String = null
val enrichedRDD: RDD[(String, SubscriberDetails)] = hbaseContext.bulkGet[(String, SubscriberDetails), (String, SubscriberDetails)](
TableName.valueOf("prod:customer"),
1000,
rdd, //input RDD
(record => {
key = record._1
mssg = record._2
new Get(Bytes.toBytes(mssg.imsi))
}),
(result: Result) => processResult(key, mssg, result)
)
enrichedRDD
}
Вышеуказанный подход имеет огромную проблему с производительностью. Извлечение записей 70 КБ занимает около 30 секунд (эти записи 70 КБ будут получены в течение 1 секунды как часть потоковой передачи). У меня есть 2 вопроса.
- Не могли бы вы дать мне знать, как улучшить производительность.
- Есть ли способ извлечь определенные столбцы из HBase. Я попробовал ниже вариант. Но это не сработало.
hbaseConfig.set(TableInputFormat.INPUT_TABLE, schemdto.tableName);
hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "fname:column1 fname:column2");
Спасибо.