Как десериализовать анонимный класс в Spark Scala? - PullRequest
0 голосов
/ 12 сентября 2018

Я создал модели ML и сохранил их в MariaDB как сериализованный объект с Kyro и Spark в App1 .А затем прочитайте его из другого App2 .Раньше все работало правильно, но потом мне пришлось изменить код и использовать rdd.zipWithIndex в App1 .После этого kyro попросил зарегистрировать еще два класса, и я решил это, добавив следующие две строки

classOf[scala.reflect.ClassTag$$anon$1],
classOf[java.lang.Class[_]]

Хотя модели сохранены правильно (я вижу их в mariadb), но мой App2 не можетпрочитайте (десериализуйте) эти модели и дайте следующее исключение IOException.

Exception in thread "main" java.io.IOException: java.lang.NullPointerException
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
    at org.apache.spark.RangePartitioner.readObject(Partitioner.scala:227)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1973)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1565)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at se.uu.farmbio.vs.examples.StandalonePrediction$.loadModel(StandalonePrediction.scala:111)
    at se.uu.farmbio.vs.examples.StandalonePrediction$.run(StandalonePrediction.scala:83)
    at se.uu.farmbio.vs.examples.StandalonePrediction$$anonfun$main$1.apply(StandalonePrediction.scala:47)
    at se.uu.farmbio.vs.examples.StandalonePrediction$$anonfun$main$1.apply(StandalonePrediction.scala:46)
    at scala.Option.map(Option.scala:145)
    at se.uu.farmbio.vs.examples.StandalonePrediction$.main(StandalonePrediction.scala:46)
    at se.uu.farmbio.vs.examples.StandalonePrediction.main(StandalonePrediction.scala)
Caused by: java.lang.NullPointerException
    at org.apache.spark.RangePartitioner$$anonfun$readObject$1.apply$mcV$sp(Partitioner.scala:228)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
    ... 183 more

Здесь я хочу упомянуть, что я не изменил модель сохранения и код загрузки модели, как указано ниже:

Сохранение модели

private def insertModels(receptorPath: String, rModel: InductiveClassifier[MLlibSVM, LabeledPoint], rPdbCode: String, jdbcHostname: String) {
//Getting filename from Path and trimming the extension
val rName = FilenameUtils.removeExtension(Paths.get(receptorPath).getFileName.toString())
println("JOB_INFO: The value of rName is " + rName)

Class.forName("org.mariadb.jdbc.Driver")
val jdbcUrl = s"jdbc:mysql://" + jdbcHostname + ":3306/db_profile?user=root&password=blahblah"

//Preparation object for writing
val baos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(baos)
oos.writeObject(rModel)

val rModelAsBytes = baos.toByteArray()
val bais = new ByteArrayInputStream(rModelAsBytes)

val connection = DriverManager.getConnection(jdbcUrl)
if (!(connection.isClosed())) {
  //Writing to Database
  val sqlInsert: PreparedStatement = connection.prepareStatement("INSERT INTO MODELS(r_name, r_pdbCode, r_model) VALUES (?, ?, ?)")

  println("JOB_INFO: Start Serializing")

  // set input parameters
  sqlInsert.setString(1, rName)
  sqlInsert.setString(2, rPdbCode)
  sqlInsert.setBinaryStream(3, bais, rModelAsBytes.length)
  sqlInsert.executeUpdate()

  sqlInsert.close()
  println("JOB_INFO: Done Serializing")

} else {
  println("MariaDb Connection is Close")
  System.exit(1)
}

}

Загрузка модели в App2

def loadModel() = {
//Connection Initialization
Class.forName("org.mariadb.jdbc.Driver")
val jdbcUrl = s"jdbc:mysql://localhost:3306/db_profile?user=root&password=whatever"
val connection = DriverManager.getConnection(jdbcUrl)

//Reading Pre-trained model from Database
var model: InductiveClassifier[MLlibSVM, LabeledPoint] = null
if (!(connection.isClosed())) {

  val sqlRead = connection.prepareStatement("SELECT r_model FROM MODELS")
  val rs = sqlRead.executeQuery()
  rs.next()

  val modelStream = rs.getObject("r_model").asInstanceOf[Array[Byte]]
  val modelBaip = new ByteArrayInputStream(modelStream)
  val modelOis = new ObjectInputStream(modelBaip)
  model = modelOis.readObject().asInstanceOf[InductiveClassifier[MLlibSVM, LabeledPoint]]

  rs.close
  sqlRead.close
  connection.close()
} else {
  println("MariaDb Connection is Close")
  System.exit(1)
}
model

}

Ошибкавозникает из линии загрузки модели.Это не изменилось, когда все работало, и я считаю, что zipWithIndex является причиной, но я не знаю, как ее решить.

modelOis.readObject().asInstanceOf[InductiveClassifier[MLlibSVM, LabeledPoint]]

Новый код в App1, который заставляет Kyro спрашиватьЕще два класса для регистрации

private def getLabeledTopAndBottom(poses: RDD[String], dsSize: Int, topPer: Float, bottomPer: Float): RDD[String] = {
//what is top % of dsSize
val topN = Math.round((topPer / 100) * dsSize)
//logInfo("JOB_INFO: topN is " + topN)
 //what is bottom % of dsSize
val bottomN = Math.round((bottomPer / 100) * dsSize)
//logInfo("JOB_INFO: bottomN is " + bottomN)
 //Get scores with mols
val molAndScore = poses.map {
  case (mol) => (mol, PosePipeline.parseScore(mol))
}
 //Sort whole DsInit by Score
val top = molAndScore.sortBy { case (mol, score) => score }
  .zipWithIndex() 
  .filter { case ((mol, score), index) => index < topN }
  .map { case ((mol, score), index) => mol }

val bottom = molAndScore.sortBy { case (mol, score) => -score }
  .zipWithIndex()
  .filter { x => x._2 < bottomN }
  .map { case ((mol, score), index) => mol }

//Labeling the top molecules with 1.0
val labledTop = top.map { topMols =>
  labelPose(topMols, 1.0)
}
//Labeling the bottom molecules with 0.0
val labledBottom = bottom.map { bottomMols =>
  labelPose(bottomMols, 0.0)
}
 val topAndBottom = labledTop.union(labledBottom)

topAndBottom

}

...