Я хочу сохранить датафрейм в hbase.Мой код ниже, но я получаю ошибку.Не могли бы вы помочь мне исправить этот код?Мне нужно написать код Pyspark.
lines = ssc.socketTextStream(host, int(port)),\
def process(time, rdd):
print("========= %s =========" % str(time)),\
try:
words = rdd.map(lambda line :line.split(" ")).collect(),\
spark = getSparkSessionInstance(rdd.context.getConf()),\
linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"]),\
linesDataFrame.show(),\
except :
pass
def catalog = {,\
"table":{"namespace":"default", "name":"locdata"},\
"rowkey":"key",\
"columns":{,\
"col0":{"cf":"rowkey", "col":"key", "type":"string"},\
"lat":{"cf":"data", "col":"lat", "type":"int"},\
"lon":{"cf":"data", "col":"lon", "type":"int"},\
}
}
linesDataFrame.write
.options(catalog=linesDataFrame),\
.format("org.apache.spark.sql.execution.datasources.hbase"),\
.save(),\
lines.foreachRDD(process),\
ssc.start()
ssc.awaitTermination()