У меня проблема в программе, и у меня нет этой проблемы с spark-shell.
Когда я звоню:
FileSystem.get(spark.sparkContext.hadoopConfiguration)
В spark-shell все работает отлично, но когда я пытаюсь использовать его в коде, я не могу прочитать core-site.xml
. Я все еще заставляю его работать, когда использую:
val conf = new Configuration()
conf.addResource(new Path("path to conf/core-site.xml"))
FileSystem.get(conf)
Это решение неприемлемо, поскольку мне нужно использовать конфигурацию Hadoop, не передавая ее явно.
Как в (Spark-shell, так и в программе) мастер вызывается с параметрами spark: //x.x.x.x: 7077
Как я могу настроить spark для использования конфигурации hadoop?
Код:
val HdfsPrefix: String = "hdfs://"
val path: String = "/tmp/"
def getHdfs(spark: SparkSession): FileSystem = {
//val conf = new Configuration()
//conf.addResource(new Path("/path to/core-site.xml"))
//FileSystem.get(conf)
FileSystem.get(spark.sparkContext.hadoopConfiguration)
}
val dfs = getHdfs(session)
data.select("name", "value").collect().foreach{ x =>
val os = dfs.create(new Path(HdfsPrefix + path + x.getString(0)))
val content: String = x.getString(1)
os.write(content.getBytes)
os.hsync()
}
Журнал ошибок:
Wrong FS: hdfs:/tmp, expected: file:///
java.lang.IllegalArgumentException: Wrong FS: hdfs:/tmp, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:428)
at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:690)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:446)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:433)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:775)
at com.bbva.ebdm.ocelot.io.hdfs.HdfsIO$HdfsOutputFile$$anonfun$write$1.apply(HdfsIO.scala:116)
at com.bbva.ebdm.ocelot.io.hdfs.HdfsIO$HdfsOutputFile$$anonfun$write$1.apply(HdfsIO.scala:115)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.bbva.ebdm.ocelot.io.hdfs.HdfsIO$HdfsOutputFile.write(HdfsIO.scala:115)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseApp$$anonfun$exec$1.apply(SparkSqlBaseApp.scala:33)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseApp$$anonfun$exec$1.apply(SparkSqlBaseApp.scala:31)
at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseApp$class.exec(SparkSqlBaseApp.scala:31)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$2$$anonfun$apply$2$$anon$1.exec(SparkSqlBaseAppTest.scala:47)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(SparkSqlBaseAppTest.scala:49)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(SparkSqlBaseAppTest.scala:47)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(SparkSqlBaseAppTest.scala:47)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(SparkSqlBaseAppTest.scala:47)
at wvlet.airframe.Design.runWithSession(Design.scala:169)
at wvlet.airframe.Design.withSession(Design.scala:182)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(SparkSqlBaseAppTest.scala:47)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSpecLike$$anon$1.apply(FunSpecLike.scala:454)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FunSpec.withFixture(FunSpec.scala:1630)
at org.scalatest.FunSpecLike$class.invokeWithFixture$1(FunSpecLike.scala:451)
at org.scalatest.FunSpecLike$$anonfun$runTest$1.apply(FunSpecLike.scala:464)
at org.scalatest.FunSpecLike$$anonfun$runTest$1.apply(FunSpecLike.scala:464)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSpecLike$class.runTest(FunSpecLike.scala:464)
at org.scalatest.FunSpec.runTest(FunSpec.scala:1630)
at org.scalatest.FunSpecLike$$anonfun$runTests$1.apply(FunSpecLike.scala:497)
at org.scalatest.FunSpecLike$$anonfun$runTests$1.apply(FunSpecLike.scala:497)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSpecLike$class.runTests(FunSpecLike.scala:497)
at org.scalatest.FunSpec.runTests(FunSpec.scala:1630)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSpec.org$scalatest$FunSpecLike$$super$run(FunSpec.scala:1630)
at org.scalatest.FunSpecLike$$anonfun$run$1.apply(FunSpecLike.scala:501)
at org.scalatest.FunSpecLike$$anonfun$run$1.apply(FunSpecLike.scala:501)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSpecLike$class.run(FunSpecLike.scala:501)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest.org$scalatest$BeforeAndAfterAll$$super$run(SparkSqlBaseAppTest.scala:31)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest.run(SparkSqlBaseAppTest.scala:31)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1346)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)