Моя задача - прочитать таблицу из Hbase и сохранить ее в пути HDFS, используя scala spark.
Используя этот выходной путь, мы должны создать внешнюю таблицу улья.Когда я выполнял сканирование без каких-либо фильтров, моя таблица Hive выглядела следующим образом:
Columns | SampleRowKey | Count of columns
Но теперь мы должны создать таблицу с новым полем, добавленным в таблицу выше.В таблице Hbase есть поле квалификатора, которое называется «M_TAX_YEAR».Мы извлекли это поле, проверили, за все годы, в которые попадает каждый столбец, и количество за тот год, за который пришел конкретный столбец.Таким образом, окончательный результат должен быть таким, как показано ниже:
Columns | SampleRowKey | Count of columns | Year
c1 | 10383883 | 3 | 2017 (28), 2018(16).
Я вставил свою часть исполнения ниже.Мне нужна помощь о том, как подсчитать и агрегировать конечный результат, как показано выше.
Пожалуйста, кто-нибудь может помочь с этим?
def processResult1(immBytes: ImmutableBytesWritable, result: Result): (String, List[String]) = {
def getCellName(cell: Cell): String = Bytes.toString(CellUtil.cloneQualifier(cell))
def getCellValue(cell: Cell): String = Bytes.toString(CellUtil.cloneValue(cell))
val kvPairs = result.rawCells.map(x => (getCellName(x), getCellValue(x))).toMap
val taxyr=kvPairs.getOrElse("M_TAX_YEAR","NF")
(Bytes.toString(immBytes.copyBytes)+"^"+taxyr, result.rawCells.toList.map(getCellName))
}
hbaseRdd.map(x => processResult(x._1, x._2)).flatMapValues(x => x).map(x => (x._2, (x._1, 1))).reduceByKey((x, y) => (x._1, x._2 + y._2)).map(x => s"${x._1},${x._2._1},${x._2._2},${x._3}").saveAsTextFile("/user/nx163561/HbaseProject/output")