Я искал в Google или Stackoverflow в течение недели и до сих пор не могу найти хороший ответ на этот вопрос.
У меня есть набор данных по химическим соединениям, и мне нужно использовать сторонний Jar, чтобы прочитать эти соединения вSDF (JSON-подобный формат данных).И тогда я должен рассчитать сходство между различными соединениями.Чтение и расчет требуют очень сложных химических деталей, поэтому я не могу воспроизвести эту функцию самостоятельно.То есть мне нужно запустить функцию в функции отображения на Spark, используя этот сторонний Jar.Файл Jar называется JCompoundMapper.Он считывает атомную связь итеративно, используя алгоритм DFS, и это кажется очень сложным.В любом случае, эта тема не о чтении химического соединения.Но о том, как нанести на карту стороннюю банку на Spark.Когда я попытался сделать это, я столкнулся с проблемой не сериализуемой задачи:
import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
Я читаю другие потоки и понимаю, что мне нужно поместить переменные и функции в объект, чтобы сделать его сериализуемым.Однако, если я сделаю это, я столкнулся с ошибкой исключения нулевого указателя:
object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}
val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`
Для части чтения я могу использовать огромный связанный HashMap и хранить там все соединения.Тем не менее, я должен использовать функцию getSimilarity()
, чтобы вычислить сходство, используя стороннюю банку.Поэтому, даже если я использую только функцию getSimilarity()
, если я помещаю ее в объект, у меня есть исключение нулевого указателя.Если я помещу это вне объекта, у меня есть задача, не сериализуемая проблема.Поэтому у меня есть пара вопросов, на которые я не смог найти хорошего ответа:
- Поддерживает ли Spark сопоставление стороннего Jar-файла каждому исполнителю?Скажем, в файле считывателя Spark распределяет класс считывателя по каждому исполнителю и читает файл по отдельности или считывает файл целиком, а затем распределяет файл на более мелкие части по каждому исполнителю?
- Почему он показываетпроблема исключения нулевого указателя?Кажется, объект действительно решил проблему сериализации, но не исключение нулевого указателя.
- Я новый инженер данных, но еще не эксперт в Spark.Но я хочу узнать, что является лучшим методом, когда нам нужно сопоставить стороннюю банку с искрой и запустить функцию распределенным способом.
Большое спасибо за все ваши ответы!Я очень ценю вашу помощь!
Лучший, Майкл