Включение коммиттера каталогов в Spark - PullRequest
0 голосов
/ 24 декабря 2018

Я пытаюсь использовать секционированный S3A (или каталог, поскольку мне просто нужно подтвердить, работает ли коммиттер должным образом) с Spark.Я перехожу по этой ссылке , на основе которой все должно быть довольно просто, однако я сталкиваюсь с новыми проблемами при решении предыдущей

Код, используемый для тестирования: (внутри spark-shell):

val sourceDF = spark.range(0, 10000)
val datasets = "s3a://bucket-name/test"
sourceDF.write.format("orc").save(datasets + "orc")

spark-defaults.conf - это:

spark.hadoop.fs.s3a.committer.name directory

spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter


Error 1:
scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoClassDefFoundError: 
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org .apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 81 more

Затем я скопировал spark-hadoop-cloud_2.11-2.3.1.3.0.2.0-50.jar из этой ссылки в папку spark / jars

Это решило предыдущую ошибку `` NoClassDefFoundError` ', но породило новую ошибку определения класса:

Ошибка 2:

java.lang.NoClassDefFoundError: 
org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
....

Полная трассировка стекапри необходимости может быть вставлен

После этого я скопировал файл hadoop-mapreduce-client-core-3.1.1.jar в папку spark / jars и снова запустил тестовый код в spark-shell.На этот раз я получил ошибку ниже:

После этого я застрял.

Ошибка 3 (и последняя ошибка, где я застрял):

scala> sourceDF.write.format("orc").save(datasets + "orc")
java.lang.NoSuchMethodError: 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.<init>(Ljava/lang/String;Ljava/lang/String;Z)V
at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.<init>(PathOutputCommitProtocol.scala:60)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:98)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
... 48 elided

Это похоже на некорректную проблему с флягой, но я не могу найти правильную.Этот вопрос похож на предыдущий вопрос , но не может найти соответствующий ответ, поэтому публикуем снова.

1 Ответ

0 голосов
/ 27 декабря 2018

Я смог сделать эту работу.Проблема была с моей искрой версии.Я использовал версию spark 2.2.1, тогда как по крайней мере необходима версия 2.3.1.Последняя ошибка, упомянутая в вопросе, указывала на неправильный конструктор.Немного покопавшись, я обнаружил, что spark-core_2.11-2.2.1.jar использует конструктор с двумя параметрами, где ожидалось, что spark-hadoop-cloud_2.11-2.3.1.3.0.2.0-50.jar 3конструктор параметров, который появился только в версии spark-core_2.11-2.3.1.jar.После повышения версии и с несколькими изменениями я смог проверить это.

Запустите эту команду, чтобы увидеть проблему:

javap -classpath spark-core_2.11-2.2.1.jar org/ apache / spark / внутренний / io / HadoopMapReduceCommitProtocol

javap -classpath spark-core_2.11-2.3.1.jar org / apache / spark / internal / io / HadoopMapReduceCommitProtocol

Просто чтобы позволитьвы знаете, я скачал spark 2.3.1 без предварительно собранной версии hadoop, затем настроил ее на jar hadoop 3.1.0 и смог заставить ее работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...