Я пишу UDT для LocaleDateTime, который спарк SQL не поддерживает изначально
class LocalDateTimeUDT extends UserDefinedType[LocalDateTime] {
override def sqlType: DataType = TimestampType
override def serialize(obj: LocalDateTime): Any = {
obj.atZone(ZoneId.systemDefault()).toInstant.toEpochMilli
}
override def deserialize(datum: Any): LocalDateTime = {
println("datum is ..." + datum)
LocalDateTime.now()
}
override def userClass: Class[LocalDateTime] = classOf[LocalDateTime]
}
Затем я пишу тестовый пример для проверки:
test("SparkSQLTest") {
val spark = SparkSession.builder().master("local").appName("SparkTest").getOrCreate()
import spark.implicits._
UDTRegistration.register(classOf[LocalDateTime].getName, classOf[LocalDateTimeUDT].getName)
val seq = Seq(LocalDateTime.now(), LocalDateTime.now())
val rdd = spark.sparkContext.parallelize(seq).map(d => Row.fromSeq(Seq(d)))
val schema = new StructType().add("udt", new LocalDateTimeUDT())
val df = spark.createDataFrame(rdd, schema)
df.printSchema()
df.show(truncate = false)
df.createOrReplaceTempView("t")
// cannot resolve '(t.`udt` > current_timestamp())' due to data type mismatch:
// differing types in '(t.`udt` > current_timestamp())' (timestamp and timestamp).; line 1 pos 22;
spark.sql("select * from t where udt > current_timestamp()").show(truncate = false)
}
Но выдает следующее исключение:
cannot resolve '(t.`udt` > current_timestamp())' due to data type mismatch: differing types in '(t.`udt` > current_timestamp())' (timestamp and timestamp).; line 1 pos 22;
Я хочу знать, чтобы мой sql-запрос (с фильтром) работал. Спасибо.