Точка в искре scala - PullRequest
       4

Точка в искре scala

2 голосов
/ 03 марта 2020

У меня есть df, подобный этому.

+---+-----+-----+----+
|  M|M_Max|Sales|Rank|
+---+-----+-----+----+
| M1|  100|  200|   1|
| M1|  100|  175|   2|
| M1|  101|  150|   3|
| M1|  100|  125|   4|
| M1|  100|   90|   5|
| M1|  100|   85|   6|
| M2|  200| 1001|   1|
| M2|  200|  500|   2|
| M2|  201|  456|   3|
| M2|  200|  345|   4|
| M2|  200|  231|   5|
| M2|  200|  123|   6|
+---+-----+-----+----+

Я делаю операцию поворота поверх этого df, вот так.

df.groupBy("M").pivot("Rank").agg(first("Sales")).show
+---+----+---+---+---+---+---+
|  M|   1|  2|  3|  4|  5|  6|
+---+----+---+---+---+---+---+
| M1| 200|175|150|125| 90| 85|
| M2|1001|500|456|345|231|123|
+---+----+---+---+---+---+---+

Но мой ожидаемый результат такой, как показано ниже. Это означает, что мне нужно получить столбец - Макс (M_Max) на выходе.

Здесь M_Max - это максимум столбца - M_Max. Мой ожидаемый результат, как показано ниже. это возможно с помощью функции Pivot без использования df joins .?

+---+----+---+---+---+---+---+-----+
|  M|   1|  2|  3|  4|  5|  6|M_Max|
+---+----+---+---+---+---+---+-----+
| M1| 200|175|150|125| 90| 85|  101|
| M2|1001|500|456|345|231|123|  201|
+---+----+---+---+---+---+---+-----+

Ответы [ 2 ]

3 голосов
/ 03 марта 2020

Хитрость заключается в применении оконных функций. Решение приведено ниже:

scala> val df = Seq(
     |      | ("M1",100,200,1),
     |      | ("M1",100,175,2),
     |      | ("M1",101,150,3),
     |      | ("M1",100,125,4),
     |      | ("M1",100,90,5),
     |      | ("M1",100,85,6),
     |      | ("M2",200,1001,1),
     |      | ("M2",200,500,2),
     |      | ("M2",200,456,3),
     |      | ("M2",200,345,4),
     |      | ("M2",200,231,5),
     |      | ("M2",201,123,6)
     |      | ).toDF("M","M_Max","Sales","Rank")
df: org.apache.spark.sql.DataFrame = [M: string, M_Max: int ... 2 more fields]

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val w = Window.partitionBy("M")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@49b4e11c

scala> df.withColumn("new", max("M_Max") over (w)).groupBy("M", "new").pivot("Rank").agg(first("Sales")).withColumnRenamed("new", "M_Max").show
+---+-----+----+---+---+---+---+---+
|  M|M_Max|   1|  2|  3|  4|  5|  6|
+---+-----+----+---+---+---+---+---+
| M1|  101| 200|175|150|125| 90| 85|
| M2|  201|1001|500|456|345|231|123|
+---+-----+----+---+---+---+---+---+


scala> df.show
+---+-----+-----+----+
|  M|M_Max|Sales|Rank|
+---+-----+-----+----+
| M1|  100|  200|   1|
| M1|  100|  175|   2|
| M1|  101|  150|   3|
| M1|  100|  125|   4|
| M1|  100|   90|   5|
| M1|  100|   85|   6|
| M2|  200| 1001|   1|
| M2|  200|  500|   2|
| M2|  200|  456|   3|
| M2|  200|  345|   4|
| M2|  200|  231|   5|
| M2|  201|  123|   6|
+---+-----+-----+----+

Дайте мне знать, если это поможет !!

1 голос
/ 05 марта 2020

В принципе, я вижу три возможных подхода.

  1. Вычисление макс. M_Max отдельно и использование join (которого вы хотите избежать.
  2. Использование окна, как предлагается в другом ответе.
  3. Вычисление max с помощью пивота и агрегирование полученных столбцов с помощью array_max.

Скорее всего, подход 1 был бы менее эффективным. Между 2 и 3, однако я не уверен. Вы можете попробовать с вашими данными и сообщите нам; -)

Подход 3 будет go следующим образом:

val df = Seq(
    ("M1", 100, 200, 1),  ("M1", 100, 175, 2), ("M1", 101, 150, 3),
    ("M1", 100, 125, 4),  ("M1", 100, 90, 5),  ("M1", 100, 85, 6),
    ("M2", 200, 1001, 1), ("M2", 200, 500, 2), ("M2", 200, 456, 3),
    ("M2", 200, 345, 4),  ("M2", 200, 231, 5), ("M2", 201, 123, 6)
).toDF("M","M_Max","Sales","Rank")

// we include the max in the pivot, so we have one max column per rank
val df_pivot = df
    .groupBy("M").pivot("Rank")
    .agg(first('Sales) as "first", max('M_Max) as "max")

val max_cols = df_pivot.columns.filter(_ endsWith "max").map(col)
// then we aggregate these max columns into one
val max_col = array_max(array(max_cols : _*)) as "M_Max"

// let's rename the first columns to match your expected output
val first_cols = df_pivot.columns.filter(_ endsWith "first")
    .map(name => col(name) as name.split("_")(0))

// And finally, we wrap everything together
df_pivot
    .select($"M" +: first_cols :+ max_col : _*)
    .show(false)

, что дает

+---+----+---+---+---+---+---+-----+
|M  |1   |2  |3  |4  |5  |6  |M_Max|
+---+----+---+---+---+---+---+-----+
|M1 |200 |175|150|125|90 |85 |101  |
|M2 |1001|500|456|345|231|123|201  |
+---+----+---+---+---+---+---+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...