Я пытаюсь создать Kafka Data Ingestion в HBase через PySpark. В данный момент я пытаюсь вставить данные из Kafka в Hbase, используя HappyBase, но это очень медленно. Я думаю, что массовая загрузка, также использующая HappyBase, не улучшит производительность значительно. Ниже приведен текущий исходный код. Мне нужно добиться максимально возможной производительности. Есть ли у вас какие-либо идеи? Может быть saveAsNewAPIHadoopDataset
или другое решение?
def SaveToHBase(rdd):
print("=====Pull from Stream=====")
if not rdd.isEmpty():
print(len(rdd.collect()))
print(datetime.now())
for line in rdd.collect():
ctable.put((line.log_id), { \
b'log:content': (line.log)})
kds = KafkaUtils.createDirectStream(ssc, topic, k_params, fromOffsets=None)
parsed = kds.filter(lambda x: x != None and len(x) > 0 )
parsed = parsed.map(lambda x: x[1])
parsed = parsed.map(lambda rec: rec.split(","))
parsed = parsed.filter(lambda x: x != None and len(x) == 2 )
parsed = parsed.map(lambda data:Row(log_id=getValue(str,data[0]), \
log=getValue(str,data[1])))
parsed.foreachRDD(SaveToHBase)