Ранг на строку по нескольким столбцам в кадре данных Spark - PullRequest
2 голосов
/ 29 марта 2019

Я использую spark с Scala для преобразования Dataframe, где я хотел бы вычислить новую переменную, которая вычисляет ранг одной переменной на строку во многих переменных.

Пример -

Input DF-

+---+---+---+
|c_0|c_1|c_2|
+---+---+---+
| 11| 11| 35|
| 22| 12| 66|
| 44| 22| 12|
+---+---+---+

Expected DF-

+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 11| 35|        2|        3|        1|
| 22| 12| 66|       2|       3|       1|
| 44| 22| 12|       1|       2|       3|
+---+---+---+--------+--------+--------+



На этот вопрос уже был дан ответ с помощью R - Ранг на строку по нескольким столбцам в R ,

, но мне нужно сделать то же самое в spark-sql с использованием scala.Спасибо за помощь!

Редактировать - 4/1.Обнаружен один сценарий, где, если значения одинаковы, ранги должны быть разными.Редактирование первой строки для воспроизведения ситуации.

Ответы [ 3 ]

1 голос
/ 29 марта 2019

Если я правильно понимаю, вы хотите иметь ранг каждого столбца, внутри каждой строки.

Давайте сначала определим данные и столбцы для «ранга».

val df = Seq((11,  21,  35),(22,  12, 66),(44, 22 , 12))
    .toDF("c_0", "c_1", "c_2")
val cols = df.columns

Затем мы определяем UDF, который находит индекс элемента в массиве.

val pos = udf((a : Seq[Int], elt : Int) => a.indexOf(elt)+1)

Наконец, мы создаем отсортированный массив (в порядке убывания) и используем UDF для определения ранга каждого столбца.

val ranks = cols.map(c => pos(col("array"), col(c)).as(c+"_rank"))
df.withColumn("array", sort_array(array(cols.map(col) : _*), false))
  .select((cols.map(col)++ranks) :_*).show 
+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 12| 35|       3|       2|       1|
| 22| 12| 66|       2|       3|       1|
| 44| 22| 12|       1|       2|       3|
+---+---+---+--------+--------+--------+

EDIT: Начиная с Spark 2.4, определенную мной pos UDF можно заменить встроенной функцией array_position(column: Column, value: Any), которая работает точно так же (первый индекс равен 1). Это позволяет избежать использования пользовательских функций, которые могут быть немного менее эффективными.

EDIT2: Приведенный выше код сгенерирует дублированные индексы, если у вас есть дублированные ключи. Если вы хотите избежать этого, вы можете создать массив, сжать его, чтобы запомнить какой столбец, отсортировать и снова сжать, чтобы получить окончательный ранг. Это будет выглядеть так:

val colMap = df.columns.zipWithIndex.map(_.swap).toMap
val zip = udf((s: Seq[Int]) => s
    .zipWithIndex
    .sortBy(-_._1)
    .map(_._2)
    .zipWithIndex
    .toMap
    .mapValues(_+1))
val ranks = (0 until cols.size)
    .map(i => 'zip.getItem(i) as colMap(i) + "_rank")
val result = df
    .withColumn("zip", zip(array(cols.map(col) : _*)))
    .select(cols.map(col) ++ ranks :_*)
0 голосов
/ 29 марта 2019

Возможно, вы могли бы создать оконную функцию.Обратите внимание, что это восприимчиво к OOM, если у вас слишком много данных.Но я просто хотел представить здесь понятие оконных функций.

inputDF.createOrReplaceTempView("my_df")
val expectedDF =  spark.sql("""
    select 
        c_0
        , c_1
        , c_2
        , rank(c_0) over (order by c_0 desc) c_0_rank
        , rank(c_1) over (order by c_1 desc) c_1_rank
        , rank(c_2) over (order by c_2 desc) c_2_rank 
    from my_df""")
expectedDF.show()

+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 44| 22| 12|       3|       3|       1|
| 11| 21| 35|       1|       2|       2|
| 22| 12| 66|       2|       1|       3|
+---+---+---+--------+--------+--------+
0 голосов
/ 29 марта 2019

Один из способов сделать это - использовать Windows.

val df = Seq((11,  21,  35),(22,  12, 66),(44, 22 , 12))
    .toDF("c_0", "c_1", "c_2")
(0 to 2)
    .map("c_"+_)
    .foldLeft(df)((d, column) => 
          d.withColumn(column+"_rank", rank() over Window.orderBy(desc(column))))
    .show
+---+---+---+--------+--------+--------+                                        
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 22| 12| 66|       2|       3|       1|
| 11| 21| 35|       3|       2|       2|
| 44| 22| 12|       1|       1|       3|
+---+---+---+--------+--------+--------+

Но это не очень хорошая идея. Все данные окажутся в одном разделе, что приведет к ошибке OOM, если все данные не помещаются в одном исполнителе.

Другой способ потребовал бы сортировки кадра данных три раза, но по крайней мере это масштабировалось бы до любого размера данных.

Давайте определим функцию, которая упаковывает данные в кадр с последовательными индексами (он существует для RDD, но не для данных)

def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
    val rdd = df.rdd.zipWithIndex
      .map{ case (row, i) => Row.fromSeq(row.toSeq :+ (i+1)) }
    val newSchema = df.schema.add(StructField(name, LongType, false))
    df.sparkSession.createDataFrame(rdd, newSchema)
}

И давайте использовать его на том же кадре данных df:

(0 to 2)
    .map("c_"+_)
    .foldLeft(df)((d, column) => 
        zipWithIndex(d.orderBy(desc(column)), column+"_rank"))
    .show

, который дает тот же результат, что и выше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...