У меня есть искровое приложение. Мой пример использования - позволить пользователям определять произвольную функцию, которая выглядит как Record => Record
как «правило», которое будет применяться к каждой записи набора данных RDD / *.
Следующий код:
//Sample rows with Id, Name, DOB and address
val row1 = "19283,Alan,1989-01-20,445 Mount Eden Road Mount Eden Auckland"
val row2 = "15689,Ben,1989-01-20,445 Mount Eden Road Mount Eden Auckland"
val record1 = new Record(
new RecordMetadata(),
row1,
true
)
val record2 = new Record(
new RecordMetadata(),
row2,
true
)
val inputRecsList = List(record1, record2)
val inputRecs = spark.sparkContext.parallelize(inputRecsList)
val rule = ScalaExpression(
//Sample rule. A lambda (Record => Record)
"""
| import model.Record
| { record: Record => record }
""".stripMargin
val outputRecs = inputRecs.map(rule.transformation)
Ниже приводится определение классов 'Record' и 'RecordMetadata' и 'ScalaExpression':
case class Record(
val metadata: RecordMetadata,
val row: String,
val isValidRecord: Boolean = true
) extends Serializable
case class RecordMetadata() extends Serializable
case class ScalaExpression(function: Function1[Record, Record]) extends Rule {
def transformation = function
}
object ScalaExpression{
/**
* @param Scala expression as a string
* @return Evaluated result of type Function (Record => Record)
*/
def apply(string: String) = {
val toolbox = currentMirror.mkToolBox()
val tree = toolbox.parse(string)
val fn = toolbox.eval(tree).asInstanceOf[(Record => Record)] //Or Function1(Record, Record)
new ScalaExpression(fn)
}
}
Приведенный выше код выдает загадочное исключение:
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Код, однако, работает хорошо, если правило определено непосредственно в коде:
val rule = ScalaExpression( {record: Record => record} )
Код также хорошо работает, если карта (с правилом оценки времени выполнения) применяется к списку вместо RDD / Dataset.
Застряли на некоторое время, пытаясь заставить его работать. Любая помощь будет оценена.
РЕДАКТИРОВАТЬ : «Возможный дубликат», помеченный для этого вопроса, решает совершенно другую проблему. Мой вариант использования пытается извлечь правило (действительный оператор scala, который преобразует одну запись в другую) во время выполнения от пользователя, и это вызывает проблемы с сериализацией при попытке применить правило к каждой записи набора данных.
С наилучшими пожеланиями.