Я пытаюсь выполнить массовую загрузку в HBase из Pyspark, используя Hfiles, как в этом сообщении: https://stackoverflow.com/a/35077987/10585126
Мой код:
conf = {"hbase.zookeeper.qourum": host,\
"zookeeper.znode.parent": "/hbase", \
"hbase.mapred.outputtable": table,\
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
def csv_to_key_value(row):
puids = row.split(",")
result = []
for (num, puid) in list(enumerate(puids))[1:]:
if puid:
val_tup = (puids[0], [puids[0], "sg", 'seg'+str(num)+'value', str(puid)])
result.append(val_tup)
ids_tup = (puids[0], [puids[0], "sg", 'seg'+str(num)+'id', str(num)])
result.append(ids_tup)
return result
data = sc.textFile(path_to_hdfs)
load_rdd = data.flatMap(lambda line: line.split("\n")).flatMap(csv_to_key_value).sortByKey(True)
load_rdd.saveAsNewAPIHadoopFile(path + str(sc.startTime),
"org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
conf=conf,
keyConverter=keyConv,
valueConverter=valueConv)
Но не могу преодолеть исключениеиз java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell
Кто-нибудь сталкивался с этой проблемой?Я использую pyspark 1.6.0 (CDH 5.9.0) с hbase-examples-1.2.0-cdh5.9.0.jar и spark-examples-1.6.0-cdh5.9.0-hadoop2.6.0-cdh5.9.0.jar
ps загрузка с Puts работает хорошо!