Создание функции UDF с Непримитивным типом данных и использование в Spark- sql Query: Scala - PullRequest
0 голосов
/ 07 мая 2020

Я создаю одну функцию в scala, которую я хочу использовать в моем запросе spark- sql. Мой запрос отлично работает в улье или если я задаю тот же запрос в искре sql, но тот же запрос Я использую в нескольких местах, поэтому я хочу создать его как многоразовую функцию / метод, поэтому всякий раз, когда это требуется, я могу просто вызвать его. Я создал функцию ниже в своем классе scala.

def date_part(date_column:Column) = {
    val m1: Column = month(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM-yyyy")))) //give  value as 01,02...etc

    m1 match {
        case 01 => concat(concat(year(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM- yyyy"))))-1,'-'),substr(year(to_date(from_unixtime(unix_timestamp(date_column, "dd-MM-yyyy")))),3,4))
        //etc..
        case _ => "some other logic"
    }
}

, но показывает множественную ошибку.

  1. Для 01:

◾Десятичные целочисленные литералы могут не иметь нуля в начале. (Восьмеричный синтаксис устарел.)

◾type mismatch; найдено: Int (0) требуется: org. apache .spark. sql .Column.

Для '-':

несоответствие типов; найдено: Char ('-') Требуется: org. apache .spark. sql .Column.

Для 'substr':

не найдено: значение substr.

также, что если я создаю любую простую функцию с типом как столбец Я не могу зарегистрировать его, так как я получаю ошибку, невозможную в столбцовом формате. И для всех примитивных типов данных (String, Long, Int) он работает нормально. Но в моем случае тип столбца, поэтому я не могу для этого. Может кто-нибудь, пожалуйста, посоветуйте мне, как мне это сделать. На данный момент я обнаружил при переполнении стека, что мне нужно использовать эту функцию с df, а затем нужно преобразовать этот df как временную таблицу. может кто-нибудь, пожалуйста, посоветуйте мне любой другой альтернативный способ, поэтому без особых изменений в моем существующем коде я могу использовать эту функцию.

Ответы [ 2 ]

0 голосов
/ 07 мая 2020

Попробуйте код ниже.

scala> import org.joda.time.format._
import org.joda.time.format._

scala> spark.udf.register("datePart",(date:String) => DateTimeFormat.forPattern("MM-dd-yyyy").parseDateTime(date).toString(DateTimeFormat.forPattern("MMyyyy")))
res102: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> spark.sql("""select datePart("03-01-2019") as datepart""").show
+--------+
|datepart|
+--------+
|  032019|
+--------+
0 голосов
/ 07 мая 2020

Во-первых, Spark нужно будет прочитать файл, в котором хранятся данные, я предполагаю, что этот файл является CSV, но вы можете использовать метод json insted из csv.

Затем вы можете добавить новые столбцы с вычисленным значение, как показано ниже:

     import org.apache.spark.sql.functions._

      val df = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("/path/mydata.csv")

      def transformDate( dateColumn: String, df: DataFrame) : DataFrame = {
         df.withColumn("calculatedCol", month(to_date(from_unixtime(unix_timestamp(col(dateColumn), "dd-MM-yyyy")))))

         df.withColumn("newColumnWithDate",  when(col("calculatedCol") === "01", concat(concat(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol"), "dd-MM- yyyy"))))-1, lit('-')),substring(year(to_date(from_unixtime(unix_timestamp(col("calculatedCol")), "dd-MM-yyyy"))),4,2))
          .when(col("calculatedCol") === "02","some other logic")
          .otherwise("nothing match")))
      }

     // calling your function for the Dataframe you want transform date column:
     transformDate("date_column", df)

Обратите внимание, что некоторым функциям требуется столбец в качестве аргумента, а не строковое значение, поэтому используйте lit () для указания этих значений.

UDF не требуется (и с точки зрения производительности не рекомендуется), но вы можете использовать его следующим образом:

val upper: String => String = _.toUpperCase
import org.apache.spark.sql.functions.udf
val upperUDF = udf(upper)
df.withColumn("upper", upperUDF('text)).show

Где 'верхняя' функция будет методом, который вы должны включить logi c для преобразования столбца даты.

...