KryoException: невозможно найти класс с искровым структурированным потоком - PullRequest
0 голосов
/ 30 сентября 2018

1-Проблема

У меня есть программа Spark , в которой используется Kryo , но не как часть Механика искры .В частности, я использую Spark Structured Streaming , подключенный к Kafka .

Я читаю двоичные значения, полученные от Kafka, и самостоятельно декодирую их.

При попытке десериализации данных с помощью Kryo я столкнулся с исключением.Однако эта проблема возникает, только когда я упаковываю свою программу и запускаю ее на Spark Standalone Cluster .То есть, это не происходит, когда я запускаю его, в пределах intellij, т.е. как в Spark Local Mode (dev mode) .

Исключение, которое я получаю, выглядит следующим образом:

Причина: com.esotericsoftware.kryo.KryoException: Невозможно найти класс: com.elsevier.entellect.commons.package $RawData

Обратите внимание, что RawData - это мой собственный класс case, расположенный в одном из подпроектов моей многопроектной сборки.

Чтобы понять контекст, пожалуйста, найдите более подробную информацию ниже:

2-build.sbt:

lazy val commonSettings = Seq(
  organization  := "com.elsevier.entellect",
  version       := "0.1.0-SNAPSHOT",
  scalaVersion  := "2.11.12",
  resolvers     += Resolver.mavenLocal,
  updateOptions := updateOptions.value.withLatestSnapshots(false)
)

lazy val entellectextractors = (project in file("."))
  .settings(commonSettings).aggregate(entellectextractorscommon, entellectextractorsfetchers, entellectextractorsmappers, entellectextractorsconsumers)

lazy val entellectextractorscommon = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.esotericsoftware" % "kryo" % "5.0.0-RC1",
      "com.github.romix.akka" %% "akka-kryo-serialization" % "0.5.0" excludeAll(excludeJpountz),
      "org.apache.kafka" % "kafka-clients" % "1.0.1",
      "com.typesafe.akka" %% "akka-stream" % "2.5.16",
      "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.4",
      "com.typesafe.akka" % "akka-slf4j_2.11" % "2.5.16",
      "ch.qos.logback" % "logback-classic" % "1.2.3"
    )
  )

lazy val entellectextractorsfetchers = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
      "com.typesafe.slick" %% "slick" % "3.2.3",
      "com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
      "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "0.20") 
  )
  .dependsOn(entellectextractorscommon)

lazy val entellectextractorsconsumers = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-stream-kafka" % "0.22")
  )
  .dependsOn(entellectextractorscommon)

lazy val entellectextractorsmappers = project
  .settings(
      commonSettings,
      mainClass in assembly := Some("entellect.extractors.mappers.NormalizedDataMapper"),
      assemblyMergeStrategy in assembly := {
        case PathList("META-INF", "services", "org.apache.spark.sql.sources.DataSourceRegister") => MergeStrategy.concat
        case PathList("META-INF", xs @ _*) => MergeStrategy.discard
        case x => MergeStrategy.first},
      dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5",
      dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5",
      dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.9.5",
      dependencyOverrides += "org.apache.jena" % "apache-jena" % "3.8.0",
      libraryDependencies ++= Seq(
      "org.apache.jena" % "apache-jena" % "3.8.0",
      "edu.isi" % "karma-offline" % "0.0.1-SNAPSHOT",
      "org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
      "org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",
      "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1"
      //"com.datastax.cassandra" % "cassandra-driver-core" % "3.5.1"
    ))
  .dependsOn(entellectextractorscommon)



lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")

Подпроект, содержащий искрукод entellectextractorsmappers.Подпроект, который содержит класс дела RawData , который не может быть найден, является entellectextractorscommon.entellectextractorsmappers явно зависит от entellectextractorscommon.

3- Разница между тем, когда я отправляю в локальный автономный кластер, и когда я запускаю в режиме локальной разработки:

Когда я отправляю в кластер, моя искровая зависимость выглядит следующим образом:

  "org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",

Когда я работаю в локальном режиме разработки (без сценария отправки), они превращаются как таковые

  "org.apache.spark" % "spark-core_2.11" % "2.3.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1",

То естьв локальном dev мне нужно иметь зависимости, а при отправке в кластер в автономном режиме они уже находятся в кластере, поэтому я поставил их как есть.

4-Как отправить :

spark-submit --class entellect.extractors.mappers.DeNormalizedDataMapper --name DeNormalizedDataMapper --master spark://MaatPro.local:7077  --deploy-mode cluster --executor-memory 14G --num-executors 1 --conf spark.sql.shuffle.partitions=7 "/Users/maatari/IdeaProjects/EntellectExtractors/entellectextractorsmappers/target/scala-2.11/entellectextractorsmappers-assembly-0.1.0-SNAPSHOT.jar"

5-Как использовать Kryo :

5.1-Декларация и регистрация

В проекте entellectextractorscommon у меня есть объект пакета со следующим:

package object commons {

  case class RawData(modelName: String,
                     modelFile: String,
                     sourceType: String,
                     deNormalizedVal: String,
                     normalVal: Map[String, String])

  object KryoContext {
    lazy val kryoPool = new Pool[Kryo](true, false, 16) {
      protected def create(): Kryo = {
        val kryo = new Kryo()
        kryo.setRegistrationRequired(false)
        kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
        kryo
      }
    }

    lazy val outputPool = new Pool[Output](true, false, 16) {
      protected def create: Output = new Output(4096)
    }

    lazy val inputPool = new Pool[Input](true, false, 16) {
      protected def create: Input = new Input(4096)
    }
  }

  object ExecutionContext {

    implicit lazy val system  = ActorSystem()
    implicit lazy val mat     = ActorMaterializer()
    implicit lazy val ec      = system.dispatcher

  }

}

5.2-Использование

В entellectextractorsmappers (где программа spark) я работаю с mapMartition .В нем у меня есть метод для декодирования Данных, поступающих из kafka, который использует Kryo как таковой:

def decodeData(rowOfBinaryList: List[Row], kryoPool: Pool[Kryo], inputPool: Pool[Input]): List[RawData] = {

    val kryo = kryoPool.obtain()
    val input = inputPool.obtain()
    val data = rowOfBinaryList.map(r => r.getAs[Array[Byte]]("message")).map{ binaryMsg =>
      input.setInputStream(new ByteArrayInputStream(binaryMsg))
      val value = kryo.readClassAndObject(input).asInstanceOf[RawData]
      input.close()
      value
    }
    kryoPool.free(kryo)
    inputPool.free(input)
    data
  }

Примечание: объект KryoContext + Lazy val гарантирует, что kryoPoolсоздается один раз за JVM.Однако я не думаю, что проблема заключается в этом.

Я заметил в каком-то другом месте подсказку о проблемах с classLoaders, используемыми spark vs Kryo?Но не уверен, что действительно понял, что происходит.

Если бы кто-то мог дать мне несколько советов, это помогло бы, потому что я понятия не имею, с чего начать.Почему он работает в локальном режиме, а не в кластерном режиме, не нарушает ли предоставленная зависимость зависимость и создает ли проблему с Kryo?Это - Стратегия слияния Ассамблеи SBT, которая портит?

Возможно много указателей, если кто-нибудь поможет мне сузить это, это было бы здорово!

1 Ответ

0 голосов
/ 04 октября 2018

Пока что

Я решил эту проблему, подняв загрузчик класса «включения», который, как я полагаю, принадлежит Spark.Это после прочтения нескольких комментариев здесь и там о проблеме с загрузчиком классов между Kryo и Spark:

lazy val kryoPool = new Pool[Kryo](true, false, 16) {
      protected def create(): Kryo = {
        val cl = Thread.currentThread().getContextClassLoader()
        val kryo = new Kryo()
        kryo.setClassLoader(cl)
        kryo.setRegistrationRequired(false)
        kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
        kryo
      }
    }
...