хранить данные в hbase с помощью pyspark - PullRequest
0 голосов
/ 04 декабря 2018

Я хочу сохранить датафрейм в hbase.Мой код ниже, но я получаю ошибку.Не могли бы вы помочь мне исправить этот код?Мне нужно написать код Pyspark.

lines = ssc.socketTextStream(host, int(port)),\

def process(time, rdd):
    print("========= %s =========" % str(time)),\
try:
    words = rdd.map(lambda line :line.split(" ")).collect(),\
    spark = getSparkSessionInstance(rdd.context.getConf()),\
    linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"]),\
    linesDataFrame.show(),\
except :
    pass

def catalog = {,\
    "table":{"namespace":"default", "name":"locdata"},\
    "rowkey":"key",\
    "columns":{,\
    "col0":{"cf":"rowkey", "col":"key", "type":"string"},\
      "lat":{"cf":"data", "col":"lat", "type":"int"},\
      "lon":{"cf":"data", "col":"lon", "type":"int"},\
      }
     }

linesDataFrame.write
     .options(catalog=linesDataFrame),\
     .format("org.apache.spark.sql.execution.datasources.hbase"),\
     .save(),\

lines.foreachRDD(process),\
      ssc.start()
      ssc.awaitTermination()
...