При определении UDT в SparkSQL я создаю UDT, подобный этому
class trajUDT extends UserDefinedType[traj] {
override def sqlType: DataType = StructType(Seq(
StructField("id", DataTypes.StringType),
StructField("loc", ArrayType(StructType(Seq(
StructField("x",DataTypes.DoubleType),
StructField("y",DataTypes.DoubleType)
))))
))
...
}
, где traj - это класс
class traj(val id:UTF8String,val loc:Array[Tuple2[Double,Double]] )
, и я хочу написать сериализованную функцию, подобную этой
override def serialize(p: traj): GenericInternalRow = {
new GenericInternalRow(Array[Any](p.id,p.loc.map(x=>Array(x._1,x._2)))
}
Но это не удалось, поскольку он сказал мне, что это не может быть преобразовано в ArrayData.
Я также пишу функцию десериализации следующим образом:
override def deserialize(datum: Any): traj = {
val arr=datum.asInstanceOf[InternalRow]
val id = arr.getUTF8String(0)
val xytype=StructType(Seq(
StructField("x",DataTypes.DoubleType),
StructField("y",DataTypes.DoubleType)
))
val xy = arr.getArray(1)
val xye =xy.toArray[Tuple2[Double,Double]](xytype)
new traj(id,xye)
}
И ядумаю, это также может не сработать ...
Так может ли кто-нибудь научить меня, как сделать эти два обращения?