Проблемы в конфигурации между hadoop и spark - PullRequest
0 голосов
/ 03 мая 2019

У меня проблема в программе, и у меня нет этой проблемы с spark-shell.

Когда я звоню:

FileSystem.get(spark.sparkContext.hadoopConfiguration)

В spark-shell все работает отлично, но когда я пытаюсь использовать его в коде, я не могу прочитать core-site.xml. Я все еще заставляю его работать, когда использую:

val conf = new Configuration()
conf.addResource(new Path("path to conf/core-site.xml"))
FileSystem.get(conf)

Это решение неприемлемо, поскольку мне нужно использовать конфигурацию Hadoop, не передавая ее явно.

Как в (Spark-shell, так и в программе) мастер вызывается с параметрами spark: //x.x.x.x: 7077

Как я могу настроить spark для использования конфигурации hadoop?

Код:

val HdfsPrefix: String = "hdfs://"
val path: String = "/tmp/"

def getHdfs(spark: SparkSession): FileSystem =  {
  //val conf = new Configuration()
  //conf.addResource(new Path("/path to/core-site.xml"))
  //FileSystem.get(conf)
  FileSystem.get(spark.sparkContext.hadoopConfiguration)
}


val dfs = getHdfs(session)
  data.select("name", "value").collect().foreach{ x =>
    val os = dfs.create(new Path(HdfsPrefix + path + x.getString(0)))
    val content: String = x.getString(1)
    os.write(content.getBytes)
    os.hsync()
}

Журнал ошибок:

Wrong FS: hdfs:/tmp, expected: file:///
java.lang.IllegalArgumentException: Wrong FS: hdfs:/tmp, expected: file:///
    at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
    at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
    at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:428)
    at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:690)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:446)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:433)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:775)
    at com.bbva.ebdm.ocelot.io.hdfs.HdfsIO$HdfsOutputFile$$anonfun$write$1.apply(HdfsIO.scala:116)
    at com.bbva.ebdm.ocelot.io.hdfs.HdfsIO$HdfsOutputFile$$anonfun$write$1.apply(HdfsIO.scala:115)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at com.bbva.ebdm.ocelot.io.hdfs.HdfsIO$HdfsOutputFile.write(HdfsIO.scala:115)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseApp$$anonfun$exec$1.apply(SparkSqlBaseApp.scala:33)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseApp$$anonfun$exec$1.apply(SparkSqlBaseApp.scala:31)
    at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseApp$class.exec(SparkSqlBaseApp.scala:31)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$2$$anonfun$apply$2$$anon$1.exec(SparkSqlBaseAppTest.scala:47)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(SparkSqlBaseAppTest.scala:49)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(SparkSqlBaseAppTest.scala:47)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(SparkSqlBaseAppTest.scala:47)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(SparkSqlBaseAppTest.scala:47)
    at wvlet.airframe.Design.runWithSession(Design.scala:169)
    at wvlet.airframe.Design.withSession(Design.scala:182)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(SparkSqlBaseAppTest.scala:47)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FunSpecLike$$anon$1.apply(FunSpecLike.scala:454)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.FunSpec.withFixture(FunSpec.scala:1630)
    at org.scalatest.FunSpecLike$class.invokeWithFixture$1(FunSpecLike.scala:451)
    at org.scalatest.FunSpecLike$$anonfun$runTest$1.apply(FunSpecLike.scala:464)
    at org.scalatest.FunSpecLike$$anonfun$runTest$1.apply(FunSpecLike.scala:464)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.FunSpecLike$class.runTest(FunSpecLike.scala:464)
    at org.scalatest.FunSpec.runTest(FunSpec.scala:1630)
    at org.scalatest.FunSpecLike$$anonfun$runTests$1.apply(FunSpecLike.scala:497)
    at org.scalatest.FunSpecLike$$anonfun$runTests$1.apply(FunSpecLike.scala:497)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.FunSpecLike$class.runTests(FunSpecLike.scala:497)
    at org.scalatest.FunSpec.runTests(FunSpec.scala:1630)
    at org.scalatest.Suite$class.run(Suite.scala:1147)
    at org.scalatest.FunSpec.org$scalatest$FunSpecLike$$super$run(FunSpec.scala:1630)
    at org.scalatest.FunSpecLike$$anonfun$run$1.apply(FunSpecLike.scala:501)
    at org.scalatest.FunSpecLike$$anonfun$run$1.apply(FunSpecLike.scala:501)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.FunSpecLike$class.run(FunSpecLike.scala:501)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest.org$scalatest$BeforeAndAfterAll$$super$run(SparkSqlBaseAppTest.scala:31)
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
    at com.bbva.ebdm.ocelot.templates.spark_sql.SparkSqlBaseAppTest.run(SparkSqlBaseAppTest.scala:31)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1346)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
    at org.scalatest.tools.Runner$.run(Runner.scala:850)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

Ответы [ 2 ]

0 голосов
/ 16 июля 2019

Проблема была в 'ScalaTest', он не читает core-site.xml, когда Maven компилирует проект, но spark-submit читает его правильно, когда проект компилируется.

0 голосов
/ 03 мая 2019

Вам нужно поместить hdfs-site.xml, core-site.xml в путь к классу spark, то есть к classpath вашей программы, когда вы ее запускаете

https://spark.apache.org/docs/latest/configuration.html#custom-hadoophive-configuration

Согласнодокументы:

If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that should be included on Spark’s classpath:

hdfs-site.xml, which provides default behaviors for the HDFS client.
core-site.xml, which sets the default filesystem name.
The location of these configuration files varies across Hadoop versions, but a common location is inside of /etc/hadoop/conf. Some tools create configurations on-the-fly, but offer a mechanism to download copies of them.

To make these files visible to Spark, set HADOOP_CONF_DIR in $SPARK_HOME/conf/spark-env.sh to a location containing the configuration files.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...