Hbase фильтр с использованием Pyspark - PullRequest
0 голосов
/ 24 января 2019

Я пытаюсь читать и фильтровать данные из hbase через pyspark. Пока что умею делать сканирование, сканировать по дальности (используя старт и стоп). Тем не менее, не знают, как использовать фильтры. например, valuefilter, ColumnPrefixFilter и т. д.

скажем, используя класс hbase.filter.FilterBase.filterRowKey

Пока гуглил, нашел что-то похожее, спросили ранее, но без ответа. Spark: как использовать фильтр HBase, например, QualiferFilter от python-api

Примечание: использование дистрибутива cloudera и необходимость сделать это через pyspark (слышал, что это легко сделать через scala / java)

Ниже приведен код: -

host = "host@xyz.com"
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

conf_read = {"hbase.master.kerberos.principal": "hbase/_HOST@xyz.com", \
        "hbase.rpc.protection":"privacy", \
        "hadoop.security.authentication": "kerberos", \
        "hadoop.rpc.protection": "privacy", \
        "hbase.regionserver.kerberos.principal": "hbase/_HOST@xyz.com" , \
        "hadoop.security.authentication": "kerberos", \
        "hbase.security.authentication": "kerberos", \
        "hbase.zookeeper.property.clientPort": "2181", \
        "zookeeper.znode.parent": "/hbase", \
        "hbase.zookeeper.quorum": host, \
        "hbase.mapreduce.inputtable": "namespace:tablename", \
        #"hbase.mapreduce.scan.row.start": "row2", \ --this works
        #"hbase.mapreduce.scan.row.stop": "row4", \ --this works 
        "hbase.filter.FilterBase.filterRowKey":"row3"} --this does not

testdata_rdd = spark.sparkContext.newAPIHadoopRDD(
        "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
        "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "org.apache.hadoop.hbase.client.Result",
        keyConverter=keyConv,
        valueConverter=valueConv,
        conf=conf_read)

output = testdata_rdd.collect()

print output
...