InvalidJobConfException.Выходной каталог не установлен - PullRequest
1 голос
/ 29 апреля 2019

Я пытаюсь записать некоторые данные в bigtable, используя SparkSession

val spark = SparkSession
  .builder
  .config(conf)
  .appName("my-job")
  .getOrCreate()

val hadoopConf = spark.sparkContext.hadoopConfiguration

import spark.implicits._
case class BestSellerRecord(skuNbr: String, slsQty: String, slsDollar: String, dmaNbr: String, productId: String)

val seq: DataFrame = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")

val bigtablePuts = seq.toDF.rdd.map((row: Row) => {
  val put = new Put(Bytes.toBytes(row.getString(0)))
  put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("nbr"), Bytes.toBytes(row.getString(0)))
  (new ImmutableBytesWritable(), put)
})

bigtablePuts.saveAsNewAPIHadoopDataset(hadoopConf)

Но это дает мне следующее исключение.

Exception in thread "main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:138)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:391)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)

который исходит от

bigtablePuts.saveAsNewAPIHadoopDataset(hadoopConf)

эта строка. Также я попытался установить различные конфигурации, используя hadoopConf.set, например conf.set("spark.hadoop.validateOutputSpecs", "false"), но это дает мне NullPointerException.

Как я могу исправить эту проблему?

...