Spark не может найти сам класс приложения (ClassNotFoundException) в spark-submit с JT-сборкой SBT - PullRequest
0 голосов
/ 09 мая 2018

Фон

Я пытаюсь начать работу с Spark, используя Scala.

Первоначально я пытался написать потокового потребителя Kinesis, следуя наряду с этим официальным примером . Хотя на данный момент я уменьшил количество ошибок, чтобы удалить все, что связано с Kinesis, кроме зависимости от пакета, и ошибка остается прежней.

Я использовал SBT для генерации сборочного JAR-файла моего проекта. Затем я попытался запустить его локально, используя spark-submit. (Подробные шаги ниже.)

Это последовательно не работает с ClassNotFoundException, утверждая, что он не может найти основной класс моего приложения. (Подробный вывод ниже.)

Я должен подчеркнуть:

Насколько я понимаю, Я не верю, что это то же самое ClassNotFoundException, которое видели другие плакаты , и я не верю, что этот вопрос является дубликатом этих вопросов.

В частности, насколько я могу судить:

  • Данный класс является моим основным классом приложения.
  • Я ссылаюсь на правильное полное имя класса.
  • Я включил зависимости в выходной JAR, создав JAR сборки через SBT.

Репро шаги

  1. Инициализировать пустой проект Scala SBT. (Я использовал шаблон по умолчанию в IntelliJ, но я не думаю, что это имеет значение.)
  2. Добавить код проекта. (Включено ниже.)
  3. sbt assembly
  4. ./bin/spark-submit --class "sparkpoc.KinesisExample" --master local[4] ~/git/ming-spark-poc/target/scala-2.11/ming-spark-poc-assembly-0.1.jar (С соответствующими заменами.)

Сообщение об ошибке

$ ./bin/spark-submit --class "sparkpoc.KinesisExample" --master local[4] ~/git/ming-spark-poc/target/scala-2.11/ming-spark-poc-assembly-0.1.jar 
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/Users/ming/misc/spark-2.3.0-bin-hadoop2.7/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2018-05-09 15:39:01 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
java.lang.ClassNotFoundException: sparkpoc.KinesisExample
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:466)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:566)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:499)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:374)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:836)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-05-09 15:39:01 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-05-09 15:39:01 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/py/jrf50pwj1xdd4grjvlg07g580000gp/T/spark-c5f3bade-fbfe-4516-900e-99fee1b47366

Мой код

build.sbt

name := "ming-spark-poc"

version := "0.1"

scalaVersion := "2.11.8"

val sparkVersion = "2.3.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion
)

assemblyOption in assembly := (assemblyOption in assembly).value
  .copy(includeScala = false)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
  case _ => MergeStrategy.first
}

Я знаю, что установка assemblyMergeStrategy без возврата по умолчанию является плохой практикой в ​​рабочем коде. Это был всего лишь быстрый взлом, чтобы получить проект для сборки, и, насколько я знаю, это не относится к моей текущей ошибке.

assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")

KinesisExample.scala

Первоначально это был потребитель Kinesis. Это было уменьшено до приложения-заполнителя, которое ничего не делает. Ошибка не изменилась.

package sparkpoc

import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KinesisExample {

  def main(args: Array[String]): Unit = {
    val batchInterval = Seconds(5)

    val sparkConf = new SparkConf().setAppName("SparcPocKinesisExample")
    val streamingContext = new StreamingContext(sparkConf, batchInterval)

    streamingContext.start()
    streamingContext.awaitTermination()
  }

}

Что я пробовал до сих пор

Я могу запустить официальные примеры из предварительно упакованного JAR без каких-либо проблем.

$ ./bin/run-example SparkPi 10
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/Users/ming/misc/spark-2.3.0-bin-hadoop2.7/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2018-05-09 16:14:07 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-05-09 16:14:08 INFO  SparkContext:54 - Running Spark version 2.3.0
2018-05-09 16:14:08 INFO  SparkContext:54 - Submitted application: Spark Pi
<SNIPPED>

Насколько я знаю, созданный JAR-файл содержит ожидаемый файл класса. Я проверил это независимо двумя разными способами.

Я проверил содержимое JAR с jar -tf. Насколько я могу судить, он содержит ожидаемый файл класса в ожидаемом месте.

$ jar -tf ./target/scala-2.11/ming-spark-poc-assembly-0.1.jar | grep KinesisExample
org/apache/spark/examples/streaming/KinesisExampleUtils$$anonfun$getRegionNameByEndpoint$1.class
org/apache/spark/examples/streaming/KinesisExampleUtils$$anonfun$getRegionNameByEndpoint$2.class
org/apache/spark/examples/streaming/KinesisExampleUtils$$anonfun$getRegionNameByEndpoint$3.class
org/apache/spark/examples/streaming/KinesisExampleUtils$.class
org/apache/spark/examples/streaming/KinesisExampleUtils.class
sparkpoc/KinesisExample$.class
sparkpoc/KinesisExample.class

Я извлек содержимое JAR с помощью unzip и проверил их вручную. Насколько я могу судить, он содержит ожидаемый файл класса в ожидаемом месте.

Хотя я не ожидаю, что в этом проекте что-либо зависит от текущего рабочего каталога, я повторил те же шаги, используя корневой каталог установки Spark в качестве текущего рабочего каталога, без изменений в результате.

Я попытался запустить сгенерированный JAR напрямую. Хотя я не ожидаю, что это будет работать правильно для запуска приложения Spark, я подумал, что оно может дать представление о том, что происходит с разрешением классов. Это терпит неудачу следующим образом.

$ java -jar ./target/scala-2.11/ming-spark-poc-assembly-0.1.jar "sparkpoc.KinesisExample"
Error: Could not find or load main class sparkpoc.KinesisExample
Caused by: java.lang.ClassNotFoundException: sparkpoc.KinesisExample

Я попытался переименовать пакет, в котором находится класс, в том числе в один момент поместить его в пакет верхнего уровня (без объявления package). Каждый раз, вызывая spark-submit с соответствующим полным именем класса, я все еще получал одну и ту же ошибку.

В случае, если хак assemblyMergeStrategy что-то косвенно сломал, я попытался заменить его явным перечислением следующим образом.

assemblyMergeStrategy in assembly := {
  case PathList("javax", "inject", _*) => MergeStrategy.last
  case PathList("org", "apache", _*) => MergeStrategy.last
  case PathList("org", "aopalliance", _*) => MergeStrategy.last
  case PathList("mime.types") => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

Я все еще получил ту же ошибку. РЕДАКТИРОВАТЬ: Это на самом деле работает, как ожидалось. У меня была отдельная проблема с устаревшим артефактом сборки. Смотри ниже.

Вопросы

  1. Есть ли что-то неправильное или неправильное в моей текущей настройке SBT? (Помимо взлома в assemblyMergeStrategy.)
  2. Как еще можно проверить правильность созданного JAR?
  3. Предположим, что сгенерированный JAR-файл правильный, что еще мне не хватает, что может вызвать такую ​​проблему?

Заранее благодарим вас за любые идеи или советы, которые вы можете иметь.

РЕДАКТИРОВАТЬ: разрешение

Ответ Ахмада Рагаба ниже верен.

С моим assemblyMergeStrategy действительно что-то не так, что привело к неправильному выводу JAR. Насколько я понимаю, я полагаю, что я использовал некоторые важные метаданные из-за слишком агрессивного символа подстановки в assemblyMergeStrategy. Версия, которая работает следующим образом.

assemblyMergeStrategy in assembly := {
  case PathList("javax", "inject", _*) => MergeStrategy.last
  case PathList("org", "apache", _*) => MergeStrategy.last
  case PathList("org", "aopalliance", _*) => MergeStrategy.last
  case PathList("mime.types") => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

Примечательно, что я пробовал это однажды, и это почему-то не сработало. Мое подозрение, хотя я не могу доказать это задним числом, заключается в том, что я случайно тестировал это изменение с помощью устаревшего артефакта сборки, и поэтому я случайно запустил старую версию без этих изменений. После очистки всего и восстановления с этими новыми изменениями все заработало как положено.

(Обратите внимание, что приложение все равно будет аварийно завершать работу при запуске из-за отсутствия определенных выходных данных, но, разумеется, этот сбой является ожидаемым, и мы удалили неожиданный.)

1 Ответ

0 голосов
/ 10 мая 2018

Что-то выглядит потенциально странно в этом assemblyMergeStrategy try:

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", _@_*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

Во-вторых, вам может понадобиться в настройках сборки явно указать основной класс, чтобы был создан правильный манифест, хотя, если честно, я не могу доказать это. В прошлом я с некоторым успехом делал следующее

mainClass in assembly := Some("sparkpoc.KinesisExample")

Наконец, один из способов подтвердить правильность создания банки: - 1008 *

java -classpath ming-spark-poc-assembly-0.1.jar "sparkpoc.KinesisExample"

Надеюсь, что-то из этого приведет вас в правильном направлении.

...