Есть ли способ создать UDF, который принимает массив из двух строк и передает эти строки как два аргумента функции? - PullRequest
0 голосов
/ 14 января 2019

Я новичок в Скале, так что извините за мое плохое почерк. У меня есть функция func1, которая принимает две строки и возвращает строку. У меня также есть датафрейм df1, который имеет 2 столбца a1 и b1. Я пытаюсь создать новый фрейм данных df2 с обоими столбцами из df1 (a1 и b1) и новым столбцом c1, который является выходом функции func1. Я знаю, что мне нужно использовать UDF. Я не знаю, как создать UDF, который может принимать 2 столбца и передавать эти два параметра в func1 и возвращать строку вывода (столбец c1).

Вот некоторые из вещей, которые я пробовал -

def func1(str1:String, str2:String) : String = {   
        //code
        return str3;
}

val df1= spark.sql("select * from emp")
  .select("a1", "b1").cache()


val df2 = spark.sql("select * from df1")
  .withColumn("c1", func1("a1","b1"))
  .select("a1", "b1").cache()

Но я не получаю результаты. Пожалуйста, порекомендуйте. Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 14 января 2019

Вот как, вы бы сделали это

scala> val df = Seq(("John","26"),("Bob","31")).toDF("a1","b1")
df: org.apache.spark.sql.DataFrame = [a1: string, b1: string]

scala> df.createOrReplaceTempView("emp")

scala> :paste
// Entering paste mode (ctrl-D to finish)

def func1(str1:String, str2:String) : String = {
        val str3 = s" ${str1} is ${str2} years old"
        return str3;
}

// Exiting paste mode, now interpreting.

func1: (str1: String, str2: String)String

scala> val my_udf_func1 = udf( func1(_:String,_:String):String )
my_udf_func1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StringType,Some(List(StringType, StringType)))

scala> spark.sql("select * from emp").withColumn("c1", my_udf_func1($"a1",$"b1")).show(false)
2019-01-14 21:08:30 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
+----+---+---------------------+
|a1  |b1 |c1                   |
+----+---+---------------------+
|John|26 | John is 26 years old|
|Bob |31 | Bob is 31 years old |
+----+---+---------------------+


scala>

Два места, где вам нужно исправить это ..

После определения обычной функции необходимо зарегистрировать ее в udf () как

val my_udf_func1 = udf( func1(_:String,_:String):String )

при вызове udf вы должны использовать синтаксис $"a1", а не просто "a1"

0 голосов
/ 14 января 2019

У вас в основном проблема с синтаксисом.

Помните, что когда вы делаете def func1(str1:String, str2:String) : String = ..., func1 ссылается на функциональный объект Scala, а не на выражение Spark.

С другой стороны, .withColumn ожидает выражение Spark в качестве второго аргумента.

Итак, ваш вызов .withColumn("c1", func1("a1","b1")) отправляет Spark объект Scala function, в то время как Spark API ожидает "выражение Spark" (например, столбец или операция над столбцами, например пользовательская функция ( UDF)).

К счастью, легко преобразовать функцию Scala в UDF Spark, вообще говоря, обернув ее вызовом udf метода spark.

Итак, рабочий пример может выглядеть так:

// A sample dataframe 
val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
// An example scala function that actually does something (string concat)
def concat(first: String, second: String) = first+second
// A conversion from scala function to spark UDF :
val concatUDF = udf((first: String, second: String) => concat(first, second))
// An sample execution of the UDF
// note the $ sign, which is short for indicating a column name
dataframe.withColumn("concat", concatUDF($"columnA", $"columnB")).show
+-------+-------+------+
|columnA|columnB|concat|
+-------+-------+------+
|      a|      b|    ab|
|      c|      d|    cd|
+-------+-------+------+

С этого момента должно быть легко адаптироваться к вашей точной функции и ее аргументам.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...