Spark не может прочитать данные всей строки Hbase, только значение последнего атрибута - PullRequest
0 голосов
/ 02 мая 2018

Почему я не могу получить полные данные Hbase в моем терминале

host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
hbase_rdd.collect()
[('1', '23'), ('2', '24'), ('3', '10')]

Но исходные данные в Hbase примерно такие:

ROW                   COLUMN+CELL                                               
1                    column=info:age, timestamp=1525153512915, value=23        
1                    column=info:gender, timestamp=1525153501730, value=F      
1                    column=info:name, timestamp=1525153481472, value=lihuan   
2                    column=info:age, timestamp=1525153553378, value=24        
2                    column=info:gender, timestamp=1525153542869, value=F      
2                    column=info:name, timestamp=1525153531737, value=sunzhesi 
3                    column=info:age, timestamp=1525157971696, value=10        
3                    column=info:gender, timestamp=1525157958967, value=M      
3                    column=info:name, timestamp=1525157941132, value=axin

Системная среда: Ubuntu16.04; Python3.5.2; Spark 2.3.0; Hadoop2.9.0; Hbase1.4.2

1 Ответ

0 голосов
/ 02 мая 2018

На самом деле я не уверен, что должно произойти, когда вы используете newAPIHadoopRDD, как вы, но когда я пытаюсь сканировать данные из Hbase, я добавляю "hbase.mapreduce.scan" в conf. Поэтому, возможно, попробуйте добавить что-то вроде этого:

from py4j.java_gateway import java_import
from binascii import b2a_base64
jvm = sc._gateway.jvm

java_import(jvm, "org.apache.hadoop.hbase.client.Scan")
java_import(jvm, "org.apache.hadoop.hbase.util.Bytes")
java_import(jvm, "org.apache.hadoop.hbase.protobuf.ProtobufUtil")

to_bytes = lambda x: jvm.Bytes.toBytesBinary(x)

scan = jvm.Scan()
scan.setStartRow(to_bytes(YOUR_START_ROW))
scan.setStopRow(to_bytes(YOUR_STOP_ROW))
scan.addFamily(to_bytes(YOUR_COLUMN_FAMILY_KEY))

scan_proto_bytes = jvm.ProtobufUtil.toScan(scan).toByteArray()
scan_str = b2a_base64(str(scan_proto_bytes))

        conf = {"hbase.mapreduce.inputtable" : table,
            "hbase.mapreduce.scan" : scan_str,
            "hbase.zookeeper.quorum" : host}
...