эффективный способ накопить сумму на нескольких столбцах в Pyspark - PullRequest
0 голосов
/ 27 февраля 2019

У меня таблица выглядит следующим образом:

+----+------+-----+-------+
|time|val1  |val2 |  class|
+----+------+-----+-------+
|   1|    3 |    2|      b|
|   2|    3 |    1|      b|
|   1|    2 |    4|      a|
|   2|    2 |    5|      a|
|   3|    1 |    5|      a|
+----+------+-----+-------+

Теперь я хочу сделать кумулятивную сумму по столбцам val1 и val2.Поэтому я создаю оконную функцию:

windowval = (Window.partitionBy('class').orderBy('time')
             .rangeBetween(Window.unboundedPreceding, 0))


new_df = my_df.withColumn('cum_sum1', F.sum("val1").over(windowval))
              .withColumn('cum_sum2', F.sum("val2").over(windowval))

Но я думаю, что Spark применяет оконную функцию дважды к исходной таблице, что кажется менее эффективным.Поскольку проблема довольно проста, есть ли способ просто применить оконную функцию один раз, и сделать кумулятивную сумму на обоих столбцах вместе?

1 Ответ

0 голосов
/ 27 февраля 2019

Но я думаю, что Spark дважды применяет оконную функцию к исходной таблице, что кажется менее эффективным.

Ваше предположение неверно.Достаточно взглянуть на оптимизированный логический

== Optimized Logical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- LogicalRDD [time#0L, val1#1L, val2#2L, class#3], false

или физический план

== Physical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- *(1) Sort [class#3 ASC NULLS FIRST, time#0L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(class#3, 200)
      +- Scan ExistingRDD[time#0L,val1#1L,val2#2L,class#3]

, оба четко указывают, что Window применяется только один раз.

...