Каков наилучший способ взаимодействия с Hbase с помощью Pyspark? - PullRequest
0 голосов
/ 22 февраля 2019

Я использую pyspark [spark2.3.1] и Hbase1.2.1, мне интересно, как лучше всего получить доступ к Hbase с помощью pyspark?

Я выполнил начальный уровень поиска и обнаружил, что тамЕсть несколько вариантов, таких как использование shc-core: 1.1.1-2.1-s_2.11.jar это может быть достигнуто, но везде, где я пытаюсь найти некоторые примеры, в большинстве мест код написан на Scala или примеры такжена основе скалы.Я попытался реализовать базовый код в pyspark:

from pyspark import SparkContext
from pyspark.sql import SQLContext

def main():
    sc = SparkContext()
    sqlc = SQLContext(sc)
    data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"firsttable"},
        "rowkey":"key",
        "columns":{
            "firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
            "secondcol":{"cf":"d", "col":"colname", "type":"string"}
        }
    }""".split())
    df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
    df.select("secondcol").show()

# entry point for PySpark application
if __name__ == '__main__':
    main()

и запустить его с помощью:

spark-submit  --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11  --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py

Это возвращает мне пустой вывод:

+---------+
|secondcol|
+---------+
+---------+

Я неуверен, что я делаю не так?Также не уверен, что будет лучшим подходом для этого ??

Любые ссылки будут оценены.

С уважением

1 Ответ

0 голосов
/ 25 февраля 2019

Наконец, используя SHC , я могу подключиться к HBase-1.2.1 с помощью Spark-2.3.1, используя код pyspark.Вот моя работа:

  • Все мои хэдупы [namenode, datanode, nodemanager, resourcemanager] & hbase [Hmaster, HRegionServer, HQuorumPeer] были запущены и запущены на моем экземпляре EC2.

  • Я поместил файл emp.csv в папку hdfs /test/emp.csv с данными:

    ключ, empId, empName, empWeight 1, "E007", "Bhupesh», 115,10 2,« E008 »,« Chauhan », 110,23 3,« E009 », Prithvi, 90,0 4,« E0010 »,« Raj », 80,0 5,« E0011 »,« Chauhan », 100,0

  • Я создал readwriteHBase.py файл со следующей строкой кода [для чтения файла emp.csv из HDFS, затем сначала создал tblEmployee в HBase, поместив данные в tblEmployee, а затем еще раз прочитавнекоторые данные из той же таблицы и их отображение на консоли]:

    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate()
    
        dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase"
        writeCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"},
                      "empWeight":{"cf":"personal", "col":"empWeight", "type":"double"}
                    }
                  }""".split())
    
        writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv")
        print("csv file read", writeDF.show())
        writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save()
        print("csv file written to HBase")
    
        readCatalog = ''.join("""{
                    "table":{"namespace":"default", "name":"tblEmployee"},
                    "rowkey":"key",
                    "columns":{
                      "key":{"cf":"rowkey", "col":"key", "type":"int"},
                      "empId":{"cf":"personal","col":"empId","type":"string"},
                      "empName":{"cf":"personal", "col":"empName", "type":"string"}
                    }
                  }""".split())
    
        print("going to read data from Hbase table")
        readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load()
        print("data read from HBase table")
        readDF.select("empId", "empName").show()
        readDF.show()
    
    # entry point for PySpark application
    if __name__ == '__main__':
        main()
    
  • Запустите этот скрипт на консоли виртуальной машины с помощью команды:

    spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
    
  • Промежуточный результат: После чтения CSV-файла:

    +---+-----+-------+---------+
    |key|empId|empName|empWeight|
    +---+-----+-------+---------+
    |  1| E007|Bhupesh|    115.1|
    |  2| E008|Chauhan|   110.23|
    |  3| E009|Prithvi|     90.0|
    |  4|E0010|    Raj|     80.0|
    |  5|E0011|Chauhan|    100.0|
    +---+-----+-------+---------+
    
  • Окончательный вывод: после чтения данных из таблицы HBase:

    +-----+-------+
    |empId|empName|
    +-----+-------+
    | E007|Bhupesh|
    | E008|Chauhan|
    | E009|Prithvi|
    |E0010|    Raj|
    |E0011|Chauhan|
    +-----+-------+
    

Примечание : при создании таблицы Hbase и вставке данных в таблицу HBase ожидается, что NumberOfRegions должно быть больше 3, поэтому я добавил options(catalog=writeCatalog, newtable=5) при добавлении данных в HBase

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...