Pyspark HBASE соединение - PullRequest
       96

Pyspark HBASE соединение

0 голосов
/ 16 июня 2020

Я хочу загрузить Dataframe в Google BigTable с помощью коннектора pyspark и hbase. Для этого я настроил контекст следующим образом:

ss = SparkSession.builder \
        .appName("loadListeSpark") \
        .master("local[1]") \
        .config('spark.jars.packages', 'com.hortonworks:shc-core:1.1.1-2.1-s_2.11') \
        .config('spark.jars.repositories', 'http://repo.hortonworks.com/content/repositories/releases/') \
        .getOrCreate()

Фрагмент кода, который должен загружать фрейм данных в BigTable, это:

catalog_string = ''.join("""{
                    "table":{"namespace":"bigt-project-name", "name":"table-name"},
                    "rowkey":"cod_cli",
                    "columns":{
                        "1":{"cf":"products", "col":"col1", "type":"string"},
                        "2":{"cf":"products", "col":"col2", "type":"string"},
                        "3":{"cf":"products", "col":"col3", "type":"string"},
                    }
                }""".split())

df.write \
        .option("catalog", catalog_string) \
        .format('org.apache.spark.sql.execution.datasources.hbase') \
        .save()

Фрейм данных :

|cod_cli|col1                       |col2                       |col3                       |
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|

Когда я пытаюсь запустить задание, выполнив:

spark-submit --driver-memory 8g --executor-memory 8g --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/repositories/releases/ main_spark.py

, у меня возникает следующее исключение:

py4j.protocol.Py4JJavaError: An error occurred while calling o118.save.
: java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:80)
    at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Я уже пытался изменить json4s jars в папку SPARK_HOME, удалив старые файлы и добавив:

json4s-ast_2.11-4.0.0-M1.jar      json4s-ext_2.13-3.7.0-M4.jar      json4s-native_2.10-3.6.9.jar
json4s-core_2.13-3.7.0-M4.jar     json4s-jackson_2.13-3.7.0-M4.jar  json4s-scalap_2.13-3.7.0-M4.jar

Но у меня все еще такое же исключение. Кто-нибудь может мне помочь? Спасибо

...