Spark не может сопоставить записи (структуры) с классами наблюдений в качестве входных данных для пользовательских функций. На самом деле ваша функция toScoreType
не будет преобразована в классы дел (проверьте схему данных!), Внутренне она снова просто структура (т. Е. Row
).
Вы должны переписать свой код, чтобы использовать один UDF:
val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => {
score_list.map{case Row(id:String,num:Int) => Score(id,num)}.sorted
})
val temp = input_tbl
.groupBy('id1)
.agg((collect_set(struct('id2,'num))).as("score_list"))
temp.select('id1, sortScoreList('score_list).as("result")).show()
Но это не даст желаемого результата:
+---+--------------------+
|id1| result|
+---+--------------------+
| 1|[[2, 1], [3, -3],...|
+---+--------------------+
Если вы хотите одну запись, ваш UDF должен вернуть 1 класс падежа, такой как:
val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => {
score_list.map{case Row(id:String,num:Int) => Score(id,num)}.sorted.head
})
А затем преобразовать вашу структуру a столбцы:
temp.select('id1, sortScoreList('score_list).as("result"))
.select($"id1",$"result.*")
.show()
РЕДАКТИРОВАТЬ:
Чтобы получить желаемый результат, я бы сделал это так:
case class Score(id: String, num: Int)
val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => {
score_list.map{case Row(id:String,num:Int) => Score(id,num)}.minBy(_.num)
})
temp.select('id1, sortScoreList('score_list).as("result"))
.select($"id1",$"result.*")
.show()
+---+---+---+
|id1| id|num|
+---+---+---+
| 1| 3| -3|
+---+---+---+