У вас в основном проблема с синтаксисом.
Помните, что когда вы делаете def func1(str1:String, str2:String) : String = ...
, func1 ссылается на функциональный объект Scala, а не на выражение Spark.
С другой стороны, .withColumn
ожидает выражение Spark в качестве второго аргумента.
Итак, ваш вызов .withColumn("c1", func1("a1","b1"))
отправляет Spark объект Scala function
, в то время как Spark API ожидает "выражение Spark" (например, столбец или операция над столбцами, например пользовательская функция ( UDF)).
К счастью, легко преобразовать функцию Scala в UDF Spark, вообще говоря, обернув ее вызовом udf
метода spark.
Итак, рабочий пример может выглядеть так:
// A sample dataframe
val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
// An example scala function that actually does something (string concat)
def concat(first: String, second: String) = first+second
// A conversion from scala function to spark UDF :
val concatUDF = udf((first: String, second: String) => concat(first, second))
// An sample execution of the UDF
// note the $ sign, which is short for indicating a column name
dataframe.withColumn("concat", concatUDF($"columnA", $"columnB")).show
+-------+-------+------+
|columnA|columnB|concat|
+-------+-------+------+
| a| b| ab|
| c| d| cd|
+-------+-------+------+
С этого момента должно быть легко адаптироваться к вашей точной функции и ее аргументам.