Я пытаюсь написать Spark Dataframe
в HBase
и следил за несколькими другими блогами, и один из них - это , но он не работает.
Однако я могу успешно прочитать данные из HBase
как Dataframe
. Также некоторые посты использовали org.apache.hadoop.hbase.spark
формат, а другие org.apache.spark.sql.execution.datasources.hbase
. Я не уверен, какой из них использовать. Spark - 2.2.2
; HBase - 1.4.7
; Scala - 2.11.12
и Hortonworks SHC 1.1.0-2.1-s_2.11
из здесь .
Код выглядит следующим образом:
case class UserMessageRecord(
rowkey: String,
Name: String,
Number: String,
message: String,
lastTS: String
)//this has been defined outside of the object scope
val exmple = List(UserMessageRecord("86325980047644033486","enrique","123455678",msgTemplate,timeStamp))
import spark.sqlContext.implicits._
val userDF = exmple.toDF()
//write to HBase
userDF.write
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase").save() //exception here
//read from HBase and it's working fine
def withCatalog(cat: String): DataFrame = {
spark.sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(catalog)
df.show()
Вот исключение:
Исключение в потоке "main" java.lang.NullPointerException
в org.apache.hadoop.hbase.security.UserProvider.instantiate (UserProvider.java:122)
в org.apache.hadoop.hbase.client.ConnectionFactory.createConnection (ConnectionFactory.java:214)
в org.apache.hadoop.hbase.client.ConnectionFactory.createConnection (ConnectionFactory.java:119)
в org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs (TableOutputFormat.java:177)
в org.apache.spark.internal.io.SparkHadoopMapReduceWriter $ .write (SparkHadoopMapReduceWriter.scala: 76)
в org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsNewAPIHadoopDataset $ 1.apply $ mcV $ sp (PairRDDFunctions.scala: 1085)
в org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsNewAPIHadoopDataset $ 1.apply (PairRDDFunctions.scala: 1085)
в org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsNewAPIHadoopDataset $ 1.apply (PairRDDFunctions.scala: 1085)
в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)
в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112)
в org.apache.spark.rdd.RDD.withScope (RDD.scala: 362)
в org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset (PairRDDFunctions.scala: 1084)
в org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.insert (HBaseRelation.scala: 218)
в org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation (HBaseRelation.scala: 61)
в org.apache.spark.sql.execution.datasources.DataSource.write (DataSource.scala: 469)
в org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run (SaveIntoDataSourceCommand.scala: 48)
в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult $ lzycompute (commands.scala: 58)
в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult (commands.scala: 56)
в org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute (commands.scala: 74)
в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 117)
в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 117)
в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 138)
в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)
в org.apache.spark.sql.execution.SparkPlan.executeQuery (SparkPlan.scala: 135)
в org.apache.spark.sql.execution.SparkPlan.execute (SparkPlan.scala: 116)
в org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute (QueryExecution.scala: 92)
в org.apache.spark.sql.execution.QueryExecution.toRdd (QueryExecution.scala: 92)
в org.apache.spark.sql.DataFrameWriter.runCommand (DataFrameWriter.scala: 609)
в org.apache.spark.sql.DataFrameWriter.save (DataFrameWriter.scala: 233)
at HbaseConnectionTest.HbaseLoadUsingSpark $ .main (HbaseLoadUsingSpark.scala: 85)
at HbaseConnectionTest.HbaseLoadUsingSpark.main (HbaseLoadUsingSpark.scala)