как сохранить фрейм данных Pyspark в HBase - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть код, который преобразует потоковые данные Pyspark в dataframe.Мне нужно сохранить этот фрейм данных в Hbase.Помогите мне написать код дополнительно.

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession

def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
    globals()['sparkSessionSingletonInstance'] = SparkSession\
        .builder\
        .config(conf=sparkConf)\
        .getOrCreate()
return globals()['sparkSessionSingletonInstance']


if __name__ == "__main__":
if len(sys.argv) != 3:
    print("Usage: sql_network_wordcount.py <hostname> <port> ", 
file=sys.stderr)
    exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(host, int(port))

def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        words = rdd.map(lambda line :line.split(" ")).collect()
        spark = getSparkSessionInstance(rdd.context.getConf())
        linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"])

        linesDataFrame.show()
except :
pass

lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()

1 Ответ

0 голосов
/ 29 ноября 2018

Вы можете использовать соединитель Spark-Hbase для доступа к HBase из Spark. Он предоставляет API как на низкоуровневом RDD, так и Dataframes.

Для соединителя требуется определить Schema дляHBase стол.Ниже приведен пример схемы, определенной для таблицы HBase с именем table1, ключом строки в качестве ключа и количеством столбцов (col1-col8).Обратите внимание, что rowkey также должен быть детально определен как столбец (col0), который имеет определенный cf (rowkey).

def catalog = '{
        "table":{"namespace":"default", "name":"table1"},\
        "rowkey":"key",\
        "columns":{\
          "col0":{"cf":"rowkey", "col":"key", "type":"string"},\
          "col1":{"cf":"cf1", "col":"col1", "type":"boolean"},\
          "col2":{"cf":"cf1", "col":"col2", "type":"double"},\
          "col3":{"cf":"cf1", "col":"col3", "type":"float"},\
          "col4":{"cf":"cf1", "col":"col4", "type":"int"},\
          "col5":{"cf":"cf2", "col":"col5", "type":"bigint"},\
          "col6":{"cf":"cf2", "col":"col6", "type":"smallint"},\
          "col7":{"cf":"cf2", "col":"col7", "type":"string"},\
          "col8":{"cf":"cf2", "col":"col8", "type":"tinyint"}\
        }\
      }'

После того, как каталог определен в соответствии со схемой вашего фрейма данных, Вы можете записать dataFrame в HBase, используя:

df.write\
.options(catalog=catalog)\
.format("org.apache.spark.sql.execution.datasources.hbase")\
.save()

Чтобы прочитать данные из HBase:

df = spark.\
read.\
format("org.apache.spark.sql.execution.datasources.hbase").\
option(catalog=catalog).\
load()

Вам необходимо включить пакет соединителя Spark-HBase, как показано ниже, при отправкеприменение искры.

pyspark --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/
...