Spark UDF для пользовательской сортировки массива структур - PullRequest
1 голос
/ 24 января 2020

Я пытаюсь использовать UDF для сортировки массива структур на основе пользовательского порядка, который я определил.

Вот пример типа результата, который я хочу получить:

input_tbl
+-------+-------+------+
| id1   | id2   | num  |
+-------+-------+------+
|   1   |   2   |  1   |
|   1   |   3   | -3   |
|   1   |   4   |  2   |
+-------+-------+------+

output_tbl
+-------+-------+------+
| id1   | id2   | num  |
+-------+-------+------+
|   1   |   3   | -3   |
+-------+-------+------+

Некоторые примеры кода для класса дела и UDF показаны ниже.

case class Score(id: String, num: Int) extends Ordered[Score] {

  def compare(that: Score): Int = {
    abs(this.num-that.num)
  }
}

val toScoreType : UserDefinedFunction = udf((id: String, num: Int) => {
    Score(id, num)
})

val sortScoreList: UserDefinedFunction = udf((score_list: Array[Score]) => {
    score_list.sorted
})

И я вызываю UDF sortScore следующим образом:

val temp = input_tbl
    .select('id1, toScoreType('id2, 'num).as("score"))
    .groupBy('id1)
    .agg((collect_set('score)).as("score_list"))


temp.select('id1, sortScoreList('score_list).as("result"))

Однако, Я получаю сообщение об ошибке "java .lang.ClassCastException: scala .collection.mutable.WrappedArray $ ofRef".

Есть ли у кого-нибудь мысли о том, что может вызывать проблему?

1 Ответ

2 голосов
/ 24 января 2020

Spark не может сопоставить записи (структуры) с классами наблюдений в качестве входных данных для пользовательских функций. На самом деле ваша функция toScoreType не будет преобразована в классы дел (проверьте схему данных!), Внутренне она снова просто структура (т. Е. Row).

Вы должны переписать свой код, чтобы использовать один UDF:

val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => {
  score_list.map{case Row(id:String,num:Int) => Score(id,num)}.sorted
})


val temp = input_tbl
  .groupBy('id1)
  .agg((collect_set(struct('id2,'num))).as("score_list"))

temp.select('id1, sortScoreList('score_list).as("result")).show()

Но это не даст желаемого результата:

+---+--------------------+
|id1|              result|
+---+--------------------+
|  1|[[2, 1], [3, -3],...|
+---+--------------------+

Если вы хотите одну запись, ваш UDF должен вернуть 1 класс падежа, такой как:

val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => {
  score_list.map{case Row(id:String,num:Int) => Score(id,num)}.sorted.head
})

А затем преобразовать вашу структуру a столбцы:

temp.select('id1, sortScoreList('score_list).as("result"))
  .select($"id1",$"result.*")
  .show()

РЕДАКТИРОВАТЬ:

Чтобы получить желаемый результат, я бы сделал это так:

case class Score(id: String, num: Int)

val sortScoreList: UserDefinedFunction = udf((score_list: Seq[Row]) => {
      score_list.map{case Row(id:String,num:Int) => Score(id,num)}.minBy(_.num)
 })


temp.select('id1, sortScoreList('score_list).as("result"))
  .select($"id1",$"result.*")
  .show()

+---+---+---+
|id1| id|num|
+---+---+---+
|  1|  3| -3|
+---+---+---+
...