org.apache.spark.SparkException: не удалось выполнить задачу при записи строк при записи данных в Hbase через spark - PullRequest
0 голосов
/ 06 июня 2019

Я пытаюсь записать данные в Hbase через фрейм данных spark

def main(args: Array[String]): Unit = {

    val warehouseLocation = new File("warehouse").getAbsolutePath

    val spark = SparkSession.builder().appName("Spark Hive").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()


    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "zookeeperIP address")
    config.set("hbase.zookeeper.property.clientPort", "2181")
    config.set(TableInputFormat.INPUT_TABLE, "june_poc_hbase_table")

    val newAPIJobConfiguration1 = Job.getInstance(config)
    newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "june_poc_hbase_table")
    newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    import spark.implicits._
    val df: DataFrame = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")

    val hbasePuts = df.rdd.map((row: Row) => {
      val put = new Put(Bytes.toBytes(row.getString(0)))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
      put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
      (new ImmutableBytesWritable(), put)
    })
    hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
  }

Ошибка:

19/06/06 09:09:37 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, or1hdp276.corp.adobe.com, executor 2): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:155)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: june_poc_hbase_table: 1 time,
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:258)
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2000(AsyncProcess.java:238)
        at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1817)
        at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:246)
        at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:169)
        at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:120)
        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.closeWriter(SparkHadoopWriter.scala:361)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:137)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139)
        ... 10 more
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...