Создание и использование UDF Spark-Hive для Date - PullRequest
0 голосов
/ 07 мая 2020

Примечание: этот вопрос связан с этим вопросом: Создание функции UDF с Непримитивным типом данных и использование в Spark- sql Запрос: Scala

Я создал метод in scala:

    package test.udf.demo
    object UDF_Class {
    def transformDate( dateColumn: String, df: DataFrame) : DataFrame = {
    val sparksession = SparkSession.builder().appName("App").getOrCreate()
    val d=df.withColumn("calculatedCol", month(to_date(from_unixtime(unix_timestamp(col(dateColumn),  "dd-MM-yyyy")))))
    df.withColumn("date1",  when(col("calculatedCol") === "01",  concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1,  lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM- yyyy"))),3,4))
    .when(col("calculatedCol") ===  "02",concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1,  lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM- yyyy"))),3,4)))
    .when(col("calculatedCol") ===  "03",concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1,  lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM-yyyy"))),3,4)))
    .otherwise(concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM-  yyyy")))), lit('-')), substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM-yyyy")))) + 1, 3, 4))))) 
    val d1=sparksession.udf.register("transform",transformDate _)
    d
    }
    }

Я хочу использовать этот метод transformDate в моем запросе spark sql, который является отдельным scala кодом в том же пакете.

    package test.udf.demo
    import test.udf.demo.transformDate
    //sparksession
    sparksession.sql("select id,name,salary,transform(dob) from dbname.tablename")

но я получить ошибку

не временная или постоянно зарегистрированная функция в базе данных по умолчанию

Кто-нибудь может помочь мне?

Ответы [ 2 ]

0 голосов
/ 07 мая 2020

Прежде всего Spark SQL UDF - это функция на основе строк. Не метод на основе Dataframe. Aggregate UDF также занимает серию строк. Итак, определение UDF неверно. Если я правильно понял ваше требование, вы хотите создать настраиваемое выражение операторов Case. Этого легко добиться с помощью expr ()

import spark.implicits._
val exprStr = "case when calculatedCol='01' then <here goes your code statements> as FP"
val modifiedDf = sql("""select id,name,salary,$exprStr  from dbname.tablename""")

Он будет работать

0 голосов
/ 07 мая 2020

Определенные пользователем udf-файлы AFAIK Spark не могут принимать или возвращать DataFrame. Это мешает вашей регистрации udf

...