Spark Task не сериализуем / не определен класс для стороннего Jar - PullRequest
0 голосов
/ 25 ноября 2018

Я искал в Google или Stackoverflow в течение недели и до сих пор не могу найти хороший ответ на этот вопрос.

У меня есть набор данных по химическим соединениям, и мне нужно использовать сторонний Jar, чтобы прочитать эти соединения вSDF (JSON-подобный формат данных).И тогда я должен рассчитать сходство между различными соединениями.Чтение и расчет требуют очень сложных химических деталей, поэтому я не могу воспроизвести эту функцию самостоятельно.То есть мне нужно запустить функцию в функции отображения на Spark, используя этот сторонний Jar.Файл Jar называется JCompoundMapper.Он считывает атомную связь итеративно, используя алгоритм DFS, и это кажется очень сложным.В любом случае, эта тема не о чтении химического соединения.Но о том, как нанести на карту стороннюю банку на Spark.Когда я попытался сделать это, я столкнулся с проблемой не сериализуемой задачи:

import de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File

val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
    val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
    featureMaps.add(new FeatureMap(rawFeatures))
    featureMaps.add(new FeatureMap(rawFeatures2))
    val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
    return temp


val func = combinations.map(x => {
    getSimilarity(0, 1)
    }).take(5)

Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace:   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.map(RDD.scala:370)
  ... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`    

Я читаю другие потоки и понимаю, что мне нужно поместить переменные и функции в объект, чтобы сделать его сериализуемым.Однако, если я сделаю это, я столкнулся с ошибкой исключения нулевого указателя:

object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
    val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
    featureMaps.add(new FeatureMap(rawFeatures))
    featureMaps.add(new FeatureMap(rawFeatures2))
    val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
    return temp
}


val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)


Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
    at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
    at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
    at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
    at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`

Для части чтения я могу использовать огромный связанный HashMap и хранить там все соединения.Тем не менее, я должен использовать функцию getSimilarity(), чтобы вычислить сходство, используя стороннюю банку.Поэтому, даже если я использую только функцию getSimilarity(), если я помещаю ее в объект, у меня есть исключение нулевого указателя.Если я помещу это вне объекта, у меня есть задача, не сериализуемая проблема.Поэтому у меня есть пара вопросов, на которые я не смог найти хорошего ответа:

  1. Поддерживает ли Spark сопоставление стороннего Jar-файла каждому исполнителю?Скажем, в файле считывателя Spark распределяет класс считывателя по каждому исполнителю и читает файл по отдельности или считывает файл целиком, а затем распределяет файл на более мелкие части по каждому исполнителю?
  2. Почему он показываетпроблема исключения нулевого указателя?Кажется, объект действительно решил проблему сериализации, но не исключение нулевого указателя.
  3. Я новый инженер данных, но еще не эксперт в Spark.Но я хочу узнать, что является лучшим методом, когда нам нужно сопоставить стороннюю банку с искрой и запустить функцию распределенным способом.

Большое спасибо за все ваши ответы!Я очень ценю вашу помощь!

Лучший, Майкл

1 Ответ

0 голосов
/ 25 ноября 2018

Я думаю, что проблема в этой строке:

val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))

Поместив этот код в object каждой JVM, на которой выполняется ваше задание Spark, необходимо его инициализировать.По сути, этот код пытается прочитать файл datasets/internal.sdf из локальной файловой системы, где бы он ни находился в вашем кластере Spark.Этот файл доступен везде?

Если вы не готовы поместить файл повсюду, вы можете попробовать поместить его в путь к классам и прочитать как ресурс.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...