Я создал модели ML и сохранил их в MariaDB как сериализованный объект с Kyro и Spark в App1 .А затем прочитайте его из другого App2 .Раньше все работало правильно, но потом мне пришлось изменить код и использовать rdd.zipWithIndex
в App1 .После этого kyro попросил зарегистрировать еще два класса, и я решил это, добавив следующие две строки
classOf[scala.reflect.ClassTag$$anon$1],
classOf[java.lang.Class[_]]
Хотя модели сохранены правильно (я вижу их в mariadb), но мой App2 не можетпрочитайте (десериализуйте) эти модели и дайте следующее исключение IOException.
Exception in thread "main" java.io.IOException: java.lang.NullPointerException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
at org.apache.spark.RangePartitioner.readObject(Partitioner.scala:227)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1973)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1565)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at se.uu.farmbio.vs.examples.StandalonePrediction$.loadModel(StandalonePrediction.scala:111)
at se.uu.farmbio.vs.examples.StandalonePrediction$.run(StandalonePrediction.scala:83)
at se.uu.farmbio.vs.examples.StandalonePrediction$$anonfun$main$1.apply(StandalonePrediction.scala:47)
at se.uu.farmbio.vs.examples.StandalonePrediction$$anonfun$main$1.apply(StandalonePrediction.scala:46)
at scala.Option.map(Option.scala:145)
at se.uu.farmbio.vs.examples.StandalonePrediction$.main(StandalonePrediction.scala:46)
at se.uu.farmbio.vs.examples.StandalonePrediction.main(StandalonePrediction.scala)
Caused by: java.lang.NullPointerException
at org.apache.spark.RangePartitioner$$anonfun$readObject$1.apply$mcV$sp(Partitioner.scala:228)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
... 183 more
Здесь я хочу упомянуть, что я не изменил модель сохранения и код загрузки модели, как указано ниже:
Сохранение модели
private def insertModels(receptorPath: String, rModel: InductiveClassifier[MLlibSVM, LabeledPoint], rPdbCode: String, jdbcHostname: String) {
//Getting filename from Path and trimming the extension
val rName = FilenameUtils.removeExtension(Paths.get(receptorPath).getFileName.toString())
println("JOB_INFO: The value of rName is " + rName)
Class.forName("org.mariadb.jdbc.Driver")
val jdbcUrl = s"jdbc:mysql://" + jdbcHostname + ":3306/db_profile?user=root&password=blahblah"
//Preparation object for writing
val baos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(baos)
oos.writeObject(rModel)
val rModelAsBytes = baos.toByteArray()
val bais = new ByteArrayInputStream(rModelAsBytes)
val connection = DriverManager.getConnection(jdbcUrl)
if (!(connection.isClosed())) {
//Writing to Database
val sqlInsert: PreparedStatement = connection.prepareStatement("INSERT INTO MODELS(r_name, r_pdbCode, r_model) VALUES (?, ?, ?)")
println("JOB_INFO: Start Serializing")
// set input parameters
sqlInsert.setString(1, rName)
sqlInsert.setString(2, rPdbCode)
sqlInsert.setBinaryStream(3, bais, rModelAsBytes.length)
sqlInsert.executeUpdate()
sqlInsert.close()
println("JOB_INFO: Done Serializing")
} else {
println("MariaDb Connection is Close")
System.exit(1)
}
}
Загрузка модели в App2
def loadModel() = {
//Connection Initialization
Class.forName("org.mariadb.jdbc.Driver")
val jdbcUrl = s"jdbc:mysql://localhost:3306/db_profile?user=root&password=whatever"
val connection = DriverManager.getConnection(jdbcUrl)
//Reading Pre-trained model from Database
var model: InductiveClassifier[MLlibSVM, LabeledPoint] = null
if (!(connection.isClosed())) {
val sqlRead = connection.prepareStatement("SELECT r_model FROM MODELS")
val rs = sqlRead.executeQuery()
rs.next()
val modelStream = rs.getObject("r_model").asInstanceOf[Array[Byte]]
val modelBaip = new ByteArrayInputStream(modelStream)
val modelOis = new ObjectInputStream(modelBaip)
model = modelOis.readObject().asInstanceOf[InductiveClassifier[MLlibSVM, LabeledPoint]]
rs.close
sqlRead.close
connection.close()
} else {
println("MariaDb Connection is Close")
System.exit(1)
}
model
}
Ошибкавозникает из линии загрузки модели.Это не изменилось, когда все работало, и я считаю, что zipWithIndex
является причиной, но я не знаю, как ее решить.
modelOis.readObject().asInstanceOf[InductiveClassifier[MLlibSVM, LabeledPoint]]
Новый код в App1, который заставляет Kyro спрашиватьЕще два класса для регистрации
private def getLabeledTopAndBottom(poses: RDD[String], dsSize: Int, topPer: Float, bottomPer: Float): RDD[String] = {
//what is top % of dsSize
val topN = Math.round((topPer / 100) * dsSize)
//logInfo("JOB_INFO: topN is " + topN)
//what is bottom % of dsSize
val bottomN = Math.round((bottomPer / 100) * dsSize)
//logInfo("JOB_INFO: bottomN is " + bottomN)
//Get scores with mols
val molAndScore = poses.map {
case (mol) => (mol, PosePipeline.parseScore(mol))
}
//Sort whole DsInit by Score
val top = molAndScore.sortBy { case (mol, score) => score }
.zipWithIndex()
.filter { case ((mol, score), index) => index < topN }
.map { case ((mol, score), index) => mol }
val bottom = molAndScore.sortBy { case (mol, score) => -score }
.zipWithIndex()
.filter { x => x._2 < bottomN }
.map { case ((mol, score), index) => mol }
//Labeling the top molecules with 1.0
val labledTop = top.map { topMols =>
labelPose(topMols, 1.0)
}
//Labeling the bottom molecules with 0.0
val labledBottom = bottom.map { bottomMols =>
labelPose(bottomMols, 0.0)
}
val topAndBottom = labledTop.union(labledBottom)
topAndBottom
}