Исключение IOException HDP Sandbox вызывает saveAsTable на DataFrame - PullRequest
0 голосов
/ 27 февраля 2019

Я пытаюсь запустить приведенный ниже пример, который пытается создать таблицу Hive из фрейма данных Spark.Код работает, когда я вызываю spark-submit с master = local, но выдает исключение, когда я вызываю его с master = yarn.Вот вызов: spark-submit --class test.sandbox.HDPRiskFactor --master yarn --name "Фактор риска" ./hdprisk-0.0.1-SNAPSHOT.jar Кроме того, я создал таблицу из консоли Hive с именем "default.geolocation ", но я не вижу его от искры, когда я вызываю show ().Я попытался установить количество исполнителей в режиме пряжи на 0, и это тоже не работает.1) Почему код работает с мастером локальным, но с пряжей 2) Почему я не вижу таблицу, созданную в кусте из моего кода искры.

 def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().getOrCreate()
//    val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
val hadoopconf = new Configuration()
val hdfs = FileSystem.get(hadoopconf)
val csvDataDir = "/tmp/data"
//import spark.implicits._
val dataList = List(("geolocation", "csv"), ("trucks", "csv"))
listFiles(this.getClass.getClassLoader.getResource(".").getFile)
dataList.map(path => {
  val localFile = path._1 + "." + path._2
  val hdfsFile = csvDataDir + "/" + path._1 + "." + path._2
  if (!testDirExist(hdfs, hdfsFile)) copyStreamToHdfs(hdfs, "/root/", csvDataDir, localFile)
})
val geoLocationDF = spark.read.format("csv").option("header", "true").load("hdfs:///tmp/data/geolocation.csv")

// Now that we have the data loaded into a DataFrame, we can register a temporary view.
spark.sql("SHOW TABLES").show()
geoLocationDF.write.format("orc").saveAsTable("default.geolocation")
//      geoLocationDF.createOrReplaceTempView("geolocation")

spark.sql("select * from default.geolocation").show()

}

1 Ответ

0 голосов
/ 28 февраля 2019

Я не настроил контекст улья должным образом.Так было записывать файлы в корневой каталог.Решением было передать правильные параметры конфигурации:

val spark = SparkSession.builder()
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .config("spark.sql.sources.maxConcurrentWrites","1")
  .config("spark.sql.parquet.compression.codec", "snappy")
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("parquet.compression", "SNAPPY")
  .config("hive.exec.max.dynamic.partitions", "3000")
  .config("parquet.enable.dictionary", "false")
  .config("hive.support.concurrency", "true")
  .enableHiveSupport()
  .getOrCreate()
...