Функция Pyspark Window на весь фрейм данных - PullRequest
2 голосов
/ 26 февраля 2020

Рассмотрим фрейм данных pyspark. Я хотел бы суммировать весь фрейм данных по каждому столбцу и добавить результат для каждой строки.

+-----+----------+-----------+
|index|      col1| col2      |
+-----+----------+-----------+
|  0.0|0.58734024|0.085703015|
|  1.0|0.67304325| 0.17850411|

Ожидаемый результат

+-----+----------+-----------+-----------+-----------+-----------+-----------+
|index|      col1| col2      |  col1_min | col1_mean |col2_min   | col2_mean
+-----+----------+-----------+-----------+-----------+-----------+-----------+
|  0.0|0.58734024|0.085703015|  -5       | 2.3       |  -2       | 1.4 |
|  1.0|0.67304325| 0.17850411|  -5       | 2.3       |  -2       | 1.4 |

Насколько мне известно, мне понадобится окно работать с целым фреймом данных как Window, чтобы сохранить результат для каждой строки (вместо того, чтобы, например, делать статистику отдельно, затем объединяться для репликации для каждой строки)

Мои вопросы:

  1. Как написать Окно без какого-либо раздела или порядка по

Я знаю, что есть стандартное Окно с Разделением и Порядком, но не то, которое принимает все как один отдельный раздел

w = Window.partitionBy("col1", "col2").orderBy(desc("col1"))
df = df.withColumn("col1_mean", mean("col1").over(w)))

Как бы я написал Окно со всем как один раздел?

Любой способ динамической записи для всех столбцов.

Скажем, у меня 500 столбцов, писать не многократно.

df = df.withColumn("col1_mean", mean("col1").over(w))).withColumn("col1_min", min("col2").over(w)).withColumn("col2_mean", mean().over(w)).....

Предположим, я хочу получить несколько статистик для каждого столбца, поэтому каждый colx будет икра colx_min, colx_max, colx_mean.

Ответы [ 2 ]

1 голос
/ 26 февраля 2020

Вместо использования окна вы можете добиться того же с помощью пользовательской агрегации в сочетании с перекрестным объединением:

import pyspark.sql.functions as F
from pyspark.sql.functions import broadcast
from itertools import chain

df = spark.createDataFrame([
  [1, 2.3, 1],
  [2, 5.3, 2],
  [3, 2.1, 4],
  [4, 1.5, 5]
], ["index", "col1", "col2"])

agg_cols = [(
             F.min(c).alias("min_" + c), 
             F.max(c).alias("max_" + c), 
             F.mean(c).alias("mean_" + c)) 

  for c in df.columns if c.startswith('col')]

stats_df = df.agg(*list(chain(*agg_cols)))

# there is no performance impact from crossJoin since we have only one row on the right table which we broadcast (most likely Spark will broadcast it anyway)
df.crossJoin(broadcast(stats_df)).show() 

# +-----+----+----+--------+--------+---------+--------+--------+---------+
# |index|col1|col2|min_col1|max_col1|mean_col1|min_col2|max_col2|mean_col2|
# +-----+----+----+--------+--------+---------+--------+--------+---------+
# |    1| 2.3|   1|     1.5|     5.3|      2.8|       1|       5|      3.0|
# |    2| 5.3|   2|     1.5|     5.3|      2.8|       1|       5|      3.0|
# |    3| 2.1|   4|     1.5|     5.3|      2.8|       1|       5|      3.0|
# |    4| 1.5|   5|     1.5|     5.3|      2.8|       1|       5|      3.0|
# +-----+----+----+--------+--------+---------+--------+--------+---------+

Примечание 1: При использовании широковещания мы будем избегать тасования, так как широковещательный df будет быть отправлено всем исполнителям.

Примечание 2: с помощью chain(*agg_cols) мы сводим список кортежей, который мы создали на предыдущем шаге.

ОБНОВЛЕНИЕ:

Вот план выполнения для вышеуказанной программы:

== Physical Plan ==
*(3) BroadcastNestedLoopJoin BuildRight, Cross
:- *(3) Scan ExistingRDD[index#196L,col1#197,col2#198L]
+- BroadcastExchange IdentityBroadcastMode, [id=#274]
   +- *(2) HashAggregate(keys=[], functions=[finalmerge_min(merge min#233) AS min(col1#197)#202, finalmerge_max(merge max#235) AS max(col1#197)#204, finalmerge_avg(merge sum#238, count#239L) AS avg(col1#197)#206, finalmerge_min(merge min#241L) AS min(col2#198L)#208L, finalmerge_max(merge max#243L) AS max(col2#198L)#210L, finalmerge_avg(merge sum#246, count#247L) AS avg(col2#198L)#212])
      +- Exchange SinglePartition, [id=#270]
         +- *(1) HashAggregate(keys=[], functions=[partial_min(col1#197) AS min#233, partial_max(col1#197) AS max#235, partial_avg(col1#197) AS (sum#238, count#239L), partial_min(col2#198L) AS min#241L, partial_max(col2#198L) AS max#243L, partial_avg(col2#198L) AS (sum#246, count#247L)])
            +- *(1) Project [col1#197, col2#198L]
               +- *(1) Scan ExistingRDD[index#196L,col1#197,col2#198L]

Здесь мы видим BroadcastExchange из SinglePartition, который транслирует одну строку, начиная с stats_df может вписаться в SinglePartition. Поэтому данные, которые здесь перетасовываются, представляют собой только одну строку (минимально возможная).

1 голос
/ 26 февраля 2020

Мы также можем указать без orderby,partitionBy предложений в оконной функции min("<col_name>").over()

Example:

//sample data
val df=Seq((1,2,3),(4,5,6)).toDF("i","j","k")

val df1=df.columns.foldLeft(df)((df, c) => {
  df.withColumn(s"${c}_min",min(col(s"${c}")).over()).
  withColumn(s"${c}_max",max(col(s"${c}")).over()).
  withColumn(s"${c}_mean",mean(col(s"${c}")).over())
})

df1.show()
//+---+---+---+-----+-----+------+-----+-----+------+-----+-----+------+
//|  i|  j|  k|i_min|i_max|i_mean|j_min|j_max|j_mean|k_min|k_max|k_mean|
//+---+---+---+-----+-----+------+-----+-----+------+-----+-----+------+
//|  1|  2|  3|    1|    4|   2.5|    2|    5|   3.5|    3|    6|   4.5|
//|  4|  5|  6|    1|    4|   2.5|    2|    5|   3.5|    3|    6|   4.5|
//+---+---+---+-----+-----+------+-----+-----+------+-----+-----+------+
...