Как читать таблицу из Hbase, используя scala spark - PullRequest
0 голосов
/ 07 февраля 2019

Моя задача - прочитать таблицу из Hbase и сохранить ее в пути HDFS, используя scala spark.

Используя этот выходной путь, мы должны создать внешнюю таблицу улья.Когда я выполнял сканирование без каких-либо фильтров, моя таблица Hive выглядела следующим образом:

Columns | SampleRowKey | Count of columns

Но теперь мы должны создать таблицу с новым полем, добавленным в таблицу выше.В таблице Hbase есть поле квалификатора, которое называется «M_TAX_YEAR».Мы извлекли это поле, проверили, за все годы, в которые попадает каждый столбец, и количество за тот год, за который пришел конкретный столбец.Таким образом, окончательный результат должен быть таким, как показано ниже:

Columns | SampleRowKey | Count of columns | Year
c1      | 10383883     | 3                | 2017 (28), 2018(16).

Я вставил свою часть исполнения ниже.Мне нужна помощь о том, как подсчитать и агрегировать конечный результат, как показано выше.

Пожалуйста, кто-нибудь может помочь с этим?

def processResult1(immBytes: ImmutableBytesWritable, result: Result): (String, List[String]) = {
     def getCellName(cell: Cell): String = Bytes.toString(CellUtil.cloneQualifier(cell))
     def getCellValue(cell: Cell): String = Bytes.toString(CellUtil.cloneValue(cell))
     val kvPairs = result.rawCells.map(x => (getCellName(x), getCellValue(x))).toMap
     val taxyr=kvPairs.getOrElse("M_TAX_YEAR","NF")
     (Bytes.toString(immBytes.copyBytes)+"^"+taxyr, result.rawCells.toList.map(getCellName))
   }

hbaseRdd.map(x => processResult(x._1, x._2)).flatMapValues(x => x).map(x => (x._2, (x._1, 1))).reduceByKey((x, y) => (x._1, x._2 + y._2)).map(x => s"${x._1},${x._2._1},${x._2._2},${x._3}").saveAsTextFile("/user/nx163561/HbaseProject/output")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...