Невозможно зарегистрировать UDF в Spark sql - PullRequest
0 голосов
/ 06 мая 2020

Я пытаюсь зарегистрировать свою функцию UDF и хочу использовать ее в моем запросе sql, но не могу зарегистрировать свой udf. Я получаю ошибку ниже.

    val squared = (s: Column) => { 
    concat(substring(s,4,2),year(to_date(from_unixtime(unix_timestamp(s,"dd-MM-yyyy")))))
    }
    squared: org.apache.spark.sql.Column => org.apache.spark.sql.Column = <function1>

    scala> sqlContext.udf.register("dc",squared)
    java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not   supported
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:733)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:671)
    at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:143)
    ... 48 elided

Я попытался изменить столбец на строку но получается ошибка ниже.

    val squared = (s: String) => { 
    | concat(substring(s,4,2),year(to_date(from_unixtime(unix_timestamp(s,"dd-MM-yyyy")))))
    | }
    <console>:28: error: type mismatch;
    found   : String
    required: org.apache.spark.sql.Column
   concat(substring(s,4,2),year(to_date(from_unixtime(unix_timestamp(s,"dd-MM-yyyy")))))


   can someone please guide me how should i implement this.

1 Ответ

1 голос
/ 06 мая 2020

Все функции Spark из этого пакета org. apache .spark. sql .functions._ не смогут получить доступ внутри UDF.

Вместо встроенных функций Spark .. вы можете использовать простой scala код для получения того же результата.

val df = spark.sql("select * from your_table")

def date_concat(date:Column): Column = { 
    concat(substring(date,4,2),year(to_date(from_unixtime(unix_timestamp(date,"dd-MM-yyyy")))))
}

df.withColumn("date_column_name",date_concat($"date_column_name")) // with function.
df.withColumn("date_column_name",concat(substring($"date_column_name",4,2),year(to_date(from_unixtime(unix_timestamp($"date_column_name","dd-MM-yyyy")))))) // without function, direct method.
df.createOrReplaceTempView("table_name")
spark.sql("[...]") // Write your furthur logic in sql if you want.

...