Учитывайте предметы одинакового значения при определении ранга - PullRequest
0 голосов
/ 27 ноября 2018

В спарк я хотел бы посчитать, как значения меньше или равны другим значениям.Я пытался сделать это с помощью ранжирования, но в рангах получилось [1,2,2,2,3,4] -> [1,2,2,2,5,6], тогда как мне хотелось бы получить [1,2,2,2,3,4] -> [1,4,4,4,5,6]

в группе.Но это немного неуклюже и неэффективно.Есть ли лучший способ сделать это?

Редактировать: Добавлен минимальный пример того, что я пытаюсь сделать

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window


object Question extends App {
  val spark = SparkSession.builder.appName("Question").master("local[*]").getOrCreate()

  import spark.implicits._

  val win = Window.orderBy($"nums".asc)

  Seq(1, 2, 2, 2, 3, 4)
    .toDF("nums")
    .select($"nums", rank.over(win).alias("rank"))
    .as[(Int, Int)]
    .groupByKey(_._2)
    .mapGroups((rank, nums) => (rank, nums.toList.map(_._1)))
    .map(x => (x._1 + x._2.length - 1, x._2))
    .flatMap(x => x._2.map(num => (num, x._1)))
    .toDF("nums", "rank")
    .show(false)
}

Вывод:

+----+----+
|nums|rank|
+----+----+
|1   |1   |
|2   |4   |
|2   |4   |
|2   |4   |
|3   |5   |
|4   |6   |
+----+----+

Ответы [ 2 ]

0 голосов
/ 28 ноября 2018

Итак, друг указал, что если я просто вычислю ранг в порядке убывания, а затем для каждого ранга сделаю (max_rank + 1) - current_rank.Это гораздо более эффективная реализация.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window


object Question extends App {
  val spark = SparkSession.builder.appName("Question").master("local[*]").getOrCreate()

  import spark.implicits._


  val win = Window.orderBy($"nums".desc)
  val rankings = Seq(1, 2, 2, 2, 3, 4)
    .toDF("nums")
    .select($"nums", rank.over(win).alias("rank"))
    .as[(Int, Int)]

  val maxElement = rankings.select("rank").as[Int].reduce((a, b) => if (a > b) a else b)

  rankings
    .map(x => x.copy(_2 = maxElement - x._2 + 1))
    .toDF("nums", "rank")
    .orderBy("rank")
    .show(false)
}

Вывод

+----+----+
|nums|rank|
+----+----+
|1   |1   |
|2   |4   |
|2   |4   |
|2   |4   |
|3   |5   |
|4   |6   |
+----+----+
0 голосов
/ 27 ноября 2018

Использование оконных функций

scala> val df =  Seq(1, 2, 2, 2, 3, 4).toDF("nums")
df: org.apache.spark.sql.DataFrame = [nums: int]

scala> df.createOrReplaceTempView("tbl")

scala> spark.sql(" with tab1(select nums, rank() over(order by nums) rk, count(*) over(partition by nums) cn from tbl) select nums, rk+cn-1 as rk2 from tab1 ").show(false)
18/11/28 02:20:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+
|nums|rk2|
+----+---+
|1   |1  |
|2   |4  |
|2   |4  |
|2   |4  |
|3   |5  |
|4   |6  |
+----+---+


scala>

Обратите внимание, что df не разбивает ни на один столбец, поэтому spark жалуется на перемещение всех данных в один раздел.

EDIT1:

scala> spark.sql(" select nums, rank() over(order by nums) + count(*) over(partition by nums) -1 as rk2 from tbl ").show
18/11/28 23:20:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+
|nums|rk2|
+----+---+
|   1|  1|
|   2|  4|
|   2|  4|
|   2|  4|
|   3|  5|
|   4|  6|
+----+---+


scala>

РЕДАКТИРОВАТЬ2:

Эквивалентная версия df

scala> val df =  Seq(1, 2, 2, 2, 3, 4).toDF("nums")
df: org.apache.spark.sql.DataFrame = [nums: int]

scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> df.withColumn("rk2", rank().over(Window orderBy 'nums)+ count(lit(1)).over(Window.partitionBy('nums)) - 1 ).show(false)
2018-12-01 11:10:26 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+
|nums|rk2|
+----+---+
|1   |1  |
|2   |4  |
|2   |4  |
|2   |4  |
|3   |5  |
|4   |6  |
+----+---+


scala>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...