функция стеклоподъемника VS в зависимости от производительности - PullRequest
0 голосов
/ 23 января 2019

у меня датафрейм сделан как

| id | date      |  KPI_1 | ... | KPI_n
| 1  |2012-12-12 |   0.1  | ... |  0.5
| 2  |2012-12-12 |   0.2  | ... |  0.4
| 3  |2012-12-12 |   0.66 | ... |  0.66 
| 1  |2012-12-13 |   0.2  | ... |  0.46
| 4  |2012-12-14 |   0.2  | ... |  0.45 
| ...
| 55| 2013-03-15 |  0.5  | ... |  0.55

у нас есть

  • X идентификаторов
  • строка для каждого идентификатора на данную дату
  • n KPI

Мне нужно вычислить некоторый производный KPI для каждой строки, и этот KPI зависит от предыдущих значений каждого ID. Скажем, мой производный KPI - это diff, это будет:

| id | date      |  KPI_1 | ... | KPI_n | KPI_1_diff | KPI_n_diff
| 1  |2012-12-12 |   0.1  | ... |  0.5  |   0.1      | 0.5
| 2  |2012-12-12 |   0.2  | ... |  0.4  |   0.2      |0.4
| 3  |2012-12-12 |   0.66 | ... |  0.66 |   0.66     | 0.66 
| 1  |2012-12-13 |   0.2  | ... |  0.46 |   0.2-0.1  | 0.46 - 0.66
| 4  |2012-12-13 |   0.2  | ... |  0.45  ...
| ...
| 55| 2013-03-15 |  0.5  | ... |  0.55

Теперь: я бы сделал следующее:

val groupedDF = myDF.groupBy("id").agg(
    collect_list(struct(col("date",col("KPI_1"))).as("wrapped_KPI_1"),
    collect_list(struct(col("date",col("KPI_2"))).as("wrapped_KPI_2")
    // up until nth KPI
)

Я бы получил агрегированные данные, такие как:

[("2012-12-12",0.1),("2012-12-12",0.2) ...

Затем я бы отсортировал эти обернутые данные, развернул и сопоставил эти агрегированные результаты с некоторым UDF и вывел бы результат (вычислил различия и другую статистику).

Другой подход заключается в использовании оконных функций, таких как:

val window = Window.partitionBy(col("id")).orderBy(col("date")).rowsBetween(Window.unboundedPreceding,0L) 

и сделать:

val windowedDF = df.select (
  col("id"),
  col("date"),
  col("KPI_1"),
  collect_list(struct(col("date"),col("KPI_1"))).over(window),  
  collect_list(struct(col("date"),col("KPI_2"))).over(window)
   )   

Таким образом, я получаю:

[("2012-12-12",0.1)]
[("2012-12-12",0.1), ("2012-12-13",0.1)]
...

Это выглядит лучше для обработки, но я подозреваю, что повторение окна приведет к ненужной группировке и сортировке для каждого KPI.

Итак, вот вопросы:

  1. Я бы предпочел групповой подход?
  2. Я бы пошел за окном? Если да, то какой подход наиболее эффективен?

1 Ответ

0 голосов
/ 24 января 2019

Я считаю, что оконный подход должен быть лучшим решением, но перед использованием оконных функций вы должны заново разделить информационный фрейм на основе идентификатора. Это будет перетасовывать данные только один раз, и все оконные функции должны выполняться с уже перетасованным кадром данных. Надеюсь, это поможет.

Код должен быть примерно таким.

val windowedDF = df.repartition(col("id"))
  .select (
  col("id"),
  col("date"),
  col("KPI_1"),
  col("KPI_2"),
  collect_list(struct(col("date"),col("KPI_1"))).over(window),
  collect_list(struct(col("date"),col("KPI_2"))).over(window)
)

@ Рафаэль Рот

Здесь мы агрегируем по одному окну. Вот почему вы можете увидеть тот же план выполнения. Пожалуйста, посмотрите пример ниже, где агрегация по нескольким окнам может быть выполнена только из одного раздела.

val list = Seq(( "2", null, 1, 11, 1, 1 ),
  ( "2", null, 1, 22, 2, 2 ),
  ( "2", null, 1, 11, 1, 3 ),
  ( "2", null, 1, 22, 2, 1 ),
  ( "2", null, 1, 33, 1, 2 ),
  ( null, "3", 3, 33, 1, 2 ),
  ( null, "3", 3, 33, 2, 3 ),
  ( null, "3", 3, 11, 1, 1 ),
  ( null, "3", 3, 22, 2, 2 ),
  ( null, "3", 3, 11, 1, 3 )
)

val df = spark.sparkContext.parallelize(list).toDF("c1","c2","batchDate","id", "pv" , "vv")

val c1Window = Window.partitionBy("batchDate", "c1")
val c2Window = Window.partitionBy("batchDate", "c2")

val agg1df = df.withColumn("c1List",collect_list("pv").over(c1Window))
  .withColumn("c2List", collect_list("pv").over(c2Window))

val agg2df = df.repartition($"batchDate")
  .withColumn("c1List",collect_list("pv").over(c1Window))
  .withColumn("c2List", collect_list("pv").over(c2Window))


agg1df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#38], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(batchDate#16, c2#15, 1)
      +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#28], [batchDate#16, c1#14]
         +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(batchDate#16, c1#14, 1)
               +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
                     +- Scan ExternalRDDScan[obj#6]

agg2df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#60], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
   +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#50], [batchDate#16, c1#14]
      +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(batchDate#16, 1)
            +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
               +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
                  +- Scan ExternalRDDScan[obj#6]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...