HBASE SPARK Запрос с фильтром без загрузки всех hbase - PullRequest
0 голосов
/ 29 октября 2018

Мне нужно запросить HBASE, а затем работать с данными с помощью spark и scala. Моя проблема в том, что с моим решением я беру ВСЕ данные из моей таблицы HBASE, а затем фильтрую, это неэффективный способ, потому что он занимает слишком много памяти. Так что я хотел бы сделать фильтр напрямую, как я могу это сделать?

def HbaseSparkQuery(table: String, gatewayINPUT: String, sparkContext: SparkContext): DataFrame = {

    val sqlContext = new SQLContext(sparkContext)

    import sqlContext.implicits._

    val conf = HBaseConfiguration.create()

    val tableName = table

    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("hbase.master", "localhost:60000")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val hBaseRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


    val DATAFRAME = hBaseRDD.map(x => {
      (Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("eventTime"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("node"), Bytes.toBytes("imei"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("measure"), Bytes.toBytes("rssi"))))

    }).toDF()
      .withColumnRenamed("_1", "GatewayIMEA")
      .withColumnRenamed("_2", "EventTime")
      .withColumnRenamed("_3", "ap")
      .withColumnRenamed("_4", "RSSI")
      .filter($"GatewayIMEA" === gatewayINPUT)

    DATAFRAME
  }

Как вы можете видеть в моем коде, я делаю фильтр после создания кадра данных, после загрузки данных Hbase.

Заранее благодарю за ответы

Ответы [ 2 ]

0 голосов
/ 30 октября 2018

Вот решение, которое я нашел

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil

object HbaseConnector {

  def main(args: Array[String]): Unit = {

//    System.setProperty("hadoop.home.dir", "/usr/local/hadoop")
    val sparkConf = new SparkConf().setAppName("CoverageAlgPipeline").setMaster("local[*]")
    val sparkContext = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sparkContext)

    import sqlContext.implicits._

    val spark = org.apache.spark.sql.SparkSession.builder
      .master("local")
      .appName("Coverage Algorithm")
      .getOrCreate

    val GatewayIMEA = "123"

    val TABLE_NAME = "TABLE"

    val conf = HBaseConfiguration.create()

    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("hbase.master", "localhost:60000")
    conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME)

    val connection = ConnectionFactory.createConnection(conf)
    val table = connection.getTable(TableName.valueOf(TABLE_NAME))
    val scan = new Scan

    val GatewayIDFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(String.valueOf(GatewayIMEA)))
    scan.setFilter(GatewayIDFilter)

    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan))

    val hBaseRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


    val DATAFRAME = hBaseRDD.map(x => {
      (Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("eventTime"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("node"), Bytes.toBytes("imei"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("measure"), Bytes.toBytes("Measure"))))

    }).toDF()
      .withColumnRenamed("_1", "GatewayIMEA")
      .withColumnRenamed("_2", "EventTime")
      .withColumnRenamed("_3", "ap")
      .withColumnRenamed("_4", "measure")


    DATAFRAME.show()

  }

}

Что нужно сделать, это установить таблицу ввода, установить фильтр, выполнить сканирование с использованием фильтра и получить отсканированное изображение в СДР, а затем преобразовать СДР в кадр данных (необязательно)

Чтобы сделать несколько фильтров:

val timestampFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("eventTime"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(String.valueOf(dateOfDayTimestamp)))
val GatewayIDFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(String.valueOf(GatewayIMEA)))

val filters = new FilterList(GatewayIDFilter, timestampFilter)
scan.setFilter(filters)
0 голосов
/ 29 октября 2018

Вы можете использовать разъем spark-hbase с предикатом pushdown. например https://spark -packages.org / пакет / Huawei-Спарк / Спарк-SQL-на-HBase

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...