Если я правильно понимаю, вы хотите иметь ранг каждого столбца, внутри каждой строки.
Давайте сначала определим данные и столбцы для «ранга».
val df = Seq((11, 21, 35),(22, 12, 66),(44, 22 , 12))
.toDF("c_0", "c_1", "c_2")
val cols = df.columns
Затем мы определяем UDF, который находит индекс элемента в массиве.
val pos = udf((a : Seq[Int], elt : Int) => a.indexOf(elt)+1)
Наконец, мы создаем отсортированный массив (в порядке убывания) и используем UDF для определения ранга каждого столбца.
val ranks = cols.map(c => pos(col("array"), col(c)).as(c+"_rank"))
df.withColumn("array", sort_array(array(cols.map(col) : _*), false))
.select((cols.map(col)++ranks) :_*).show
+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 12| 35| 3| 2| 1|
| 22| 12| 66| 2| 3| 1|
| 44| 22| 12| 1| 2| 3|
+---+---+---+--------+--------+--------+
EDIT:
Начиная с Spark 2.4, определенную мной pos
UDF можно заменить встроенной функцией array_position(column: Column, value: Any)
, которая работает точно так же (первый индекс равен 1). Это позволяет избежать использования пользовательских функций, которые могут быть немного менее эффективными.
EDIT2:
Приведенный выше код сгенерирует дублированные индексы, если у вас есть дублированные ключи. Если вы хотите избежать этого, вы можете создать массив, сжать его, чтобы запомнить какой столбец, отсортировать и снова сжать, чтобы получить окончательный ранг. Это будет выглядеть так:
val colMap = df.columns.zipWithIndex.map(_.swap).toMap
val zip = udf((s: Seq[Int]) => s
.zipWithIndex
.sortBy(-_._1)
.map(_._2)
.zipWithIndex
.toMap
.mapValues(_+1))
val ranks = (0 until cols.size)
.map(i => 'zip.getItem(i) as colMap(i) + "_rank")
val result = df
.withColumn("zip", zip(array(cols.map(col) : _*)))
.select(cols.map(col) ++ ranks :_*)