Как сделать запрос с Spark SQL UDT - PullRequest
0 голосов
/ 13 ноября 2018

Я пишу UDT для LocaleDateTime, который спарк SQL не поддерживает изначально

class LocalDateTimeUDT extends UserDefinedType[LocalDateTime] {

  override def sqlType: DataType = TimestampType

  override def serialize(obj: LocalDateTime): Any = {
    obj.atZone(ZoneId.systemDefault()).toInstant.toEpochMilli
  }

  override def deserialize(datum: Any): LocalDateTime = {
    println("datum is ..." + datum)
    LocalDateTime.now()
  }

  override def userClass: Class[LocalDateTime] = classOf[LocalDateTime]
}

Затем я пишу тестовый пример для проверки:

 test("SparkSQLTest") {
    val spark = SparkSession.builder().master("local").appName("SparkTest").getOrCreate()
    import spark.implicits._
    UDTRegistration.register(classOf[LocalDateTime].getName, classOf[LocalDateTimeUDT].getName)
    val seq = Seq(LocalDateTime.now(), LocalDateTime.now())
    val rdd = spark.sparkContext.parallelize(seq).map(d => Row.fromSeq(Seq(d)))
    val schema = new StructType().add("udt", new LocalDateTimeUDT())
    val df = spark.createDataFrame(rdd, schema)
    df.printSchema()
    df.show(truncate = false)

    df.createOrReplaceTempView("t")
    // cannot resolve '(t.`udt` > current_timestamp())' due to data type mismatch:
    // differing types in '(t.`udt` > current_timestamp())' (timestamp and timestamp).; line 1 pos 22;
    spark.sql("select * from t where udt > current_timestamp()").show(truncate = false)

}

Но выдает следующее исключение:

cannot resolve '(t.`udt` > current_timestamp())' due to data type mismatch: differing types in '(t.`udt` > current_timestamp())' (timestamp and timestamp).; line 1 pos 22;

Я хочу знать, чтобы мой sql-запрос (с фильтром) работал. Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...