Суммируйте значения в столбце с помощью pyspark - PullRequest
2 голосов
/ 08 мая 2020

У меня есть сценарий, где у меня есть 2 таблицы, одна таблица с днями и другая таблица со значениями. Итак, из таблицы, в которой есть дни, мне нужно суммировать значения другой таблицы с тем же количеством дней. Dataframe

dataframe1
df1 = spark.createDataFrame(
     [
     ('ll',5)
     ('yy',6)
     ],
     ('x','days')
    )
  dataframe2 
  df = spark.createDataFrame(
        [
            ('ll','2020-01-05','1','10','50'),
    ('ll','2020-01-06','1','10'),
    ('ll','2020-01-07','1','10'),
    ('ll','2020-01-08','1','10'),
    ('ll','2020-01-09','1','10'),
    ('ll','2020-01-10','1','10'),
    ('ll','2020-01-11','1','20'),
    ('ll','2020-01-12','1','10'),
    ('ll','2020-01-05','2','30'),
    ('ll','2020-01-06','2','30'),
    ('ll','2020-01-07','2','30'),
    ('ll','2020-01-08','2','40'),
    ('ll','2020-01-09','2','30'),
    ('ll','2020-01-10','2','10'),
    ('ll','2020-01-11','2','10'),
    ('ll','2020-01-12','2','10'),
    ('yy','2020-01-05','1','20'),
    ('yy','2020-01-06','1','20'),
    ('yy','2020-01-07','1','20'),
    ('yy','2020-01-08','1','20'),
    ('yy','2020-01-09','1','20'),
    ('yy','2020-01-10','1','40'),
    ('yy','2020-01-11','1','20'),
    ('yy','2020-01-12','1','20'),
    ('yy','2020-01-05','2','40'),
    ('yy','2020-01-06','2','40'),
    ('yy','2020-01-07','2','40'),
    ('yy','2020-01-08','2','40'),
    ('yy','2020-01-09','2','40'),
    ('yy','2020-01-10','2','40'),
    ('yy','2020-01-11','2','60'),
    ('yy','2020-01-12','2','40')
        ],
        ('x','date','flag','value')
    )

                expected_dataframe = spark.createDataFrame(
        [
            ('ll','2020-01-05','1','10','50'),
    ('ll','2020-01-06','1','10','50'),
    ('ll','2020-01-07','1','10','60'),
    ('ll','2020-01-08','1','10','60'),
    ('ll','2020-01-09','1','10','50'),
    ('ll','2020-01-10','1','10','40'),
    ('ll','2020-01-11','1','20','30'),
    ('ll','2020-01-12','1','10','10'),
    ('ll','2020-01-05','2','30','170'),
    ('ll','2020-01-06','2','30','140'),
    ('ll','2020-01-07','2','30','120'),
    ('ll','2020-01-08','2','40','100'),
    ('ll','2020-01-09','2','30','60'),
    ('ll','2020-01-10','2','10','30'),
    ('ll','2020-01-11','2','10','20'),
    ('ll','2020-01-12','2','10','10'),
    ('yy','2020-01-05','1','20','140'),
    ('yy','2020-01-06','1','20','140'),
    ('yy','2020-01-07','1','20','140'),
    ('yy','2020-01-08','1','20','120'),
    ('yy','2020-01-09','1','20','100'),
    ('yy','2020-01-10','1','40','80'),
    ('yy','2020-01-11','1','20','40'),
    ('yy','2020-01-12','1','20','20'),
    ('yy','2020-01-05','2','40','240'),
    ('yy','2020-01-06','2','40','260'),
    ('yy','2020-01-07','2','40','260'),
    ('yy','2020-01-08','2','40','220'),
    ('yy','2020-01-09','2','40','180'),
    ('yy','2020-01-10','2','40','140'),
    ('yy','2020-01-11','2','60','100'),
    ('yy','2020-01-12','2','40','40')
        ],
        ('x','date','flag','value','result')

ожидаемые_результаты

    +---+----------+----+-----+------+
    |  x|      date|flag|value|result|
    +---+----------+----+-----+------+
    | ll|2020-01-05|   1|   10|    50|
    | ll|2020-01-06|   1|   10|    50|
    | ll|2020-01-07|   1|   10|    60|
    | ll|2020-01-08|   1|   10|    60|
    | ll|2020-01-09|   1|   10|    50|
    | ll|2020-01-10|   1|   10|    40|
    | ll|2020-01-11|   1|   20|    30|
    | ll|2020-01-12|   1|   10|    10|
    | ll|2020-01-05|   2|   30|   170|
    | ll|2020-01-06|   2|   30|   140|
    | ll|2020-01-07|   2|   30|   120|
    | ll|2020-01-08|   2|   40|   100|
    | ll|2020-01-09|   2|   30|    60|
    | ll|2020-01-10|   2|   10|    30|
    | ll|2020-01-11|   2|   10|    20|
    | ll|2020-01-12|   2|   10|    10|
    | yy|2020-01-05|   1|   20|   140|
    | yy|2020-01-06|   1|   20|   140|
    | yy|2020-01-07|   1|   20|   140|
    | yy|2020-01-08|   1|   20|   120|
    | yy|2020-01-09|   1|   20|   100|
    | yy|2020-01-10|   1|   40|    80|
    | yy|2020-01-11|   1|   20|    40|
    | yy|2020-01-12|   1|   20|    20|
    | yy|2020-01-05|   2|   40|   240|
    | yy|2020-01-06|   2|   40|   260|
    | yy|2020-01-07|   2|   40|   260|
    | yy|2020-01-08|   2|   40|   220|
    | yy|2020-01-09|   2|   40|   180|
    | yy|2020-01-10|   2|   40|   140|
    | yy|2020-01-11|   2|   60|   100|
    | yy|2020-01-12|   2|   40|    40|
    +---+----------+----+-----+------+

код

from pyspark.sql.window import Window
from pyspark.sql.functions import *
 df_join = df.join(df1,['x'],'inner').withColumn('date',to_date(col('date'),'yyyy-MM-dd'))
from pyspark.sql.window import Window
w1 =  Window.partitionBy('x','flag').orderBy(col['date'].desc())

Итак, мне нужен столбец суммарного значения на основе столбца дней, i, e, если столбец дней равен 5, Мне нужно просуммировать 5 строк значений.

Я объединил две таблицы и попытался решить с помощью оконной функции, но id не работал и не мог понять, как это решить. Может ли кто-нибудь показать мне способ его решения

1 Ответ

2 голосов
/ 08 мая 2020

Сначала вы можете присоединиться к x, а затем создать row_number() над своими строками, которые будут использоваться для single out wherever it is greater than days (превратите их в nulls), затем sum over a partitioned only window для трансляции суммы по всем строкам.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("x","flag").orderBy(F.to_date("date","yyyy-dd-MM"))
w1=Window().partitionBy("x","flag")
df.join(df1, ['x'])\
  .withColumn("rowNum", F.row_number().over(w))\
  .withColumn("expected_result", F.sum(F.when(F.col("rowNum")>F.col("days")\
                                     ,F.lit(None)).otherwise(F.col("value")))\
                                      .over(w1)).drop("days","rowNum").show()

#+---+----------+----+-----+---------------+
#|  x|      date|flag|value|expected_result|
#+---+----------+----+-----+---------------+
#| ll|2020-01-05|   1|   10|           50.0|
#| ll|2020-01-06|   1|   10|           50.0|
#| ll|2020-01-07|   1|   10|           50.0|
#| ll|2020-01-08|   1|   10|           50.0|
#| ll|2020-01-09|   1|   10|           50.0|
#| ll|2020-01-10|   1|   10|           50.0|
#| ll|2020-01-11|   1|   10|           50.0|
#| ll|2020-01-12|   1|   10|           50.0|
#| ll|2020-01-05|   2|   30|          150.0|
#| ll|2020-01-06|   2|   30|          150.0|
#| ll|2020-01-07|   2|   30|          150.0|
#| ll|2020-01-08|   2|   30|          150.0|
#| ll|2020-01-09|   2|   30|          150.0|
#| ll|2020-01-10|   2|   10|          150.0|
#| ll|2020-01-11|   2|   10|          150.0|
#| ll|2020-01-12|   2|   10|          150.0|
#| yy|2020-01-05|   1|   20|          120.0|
#| yy|2020-01-06|   1|   20|          120.0|
#| yy|2020-01-07|   1|   20|          120.0|
#| yy|2020-01-08|   1|   20|          120.0|
#+---+----------+----+-----+---------------+
#only showing top 20 rows

UPDATE :

Для Spark2.4 + вы можете использовать функции более высокого порядка transform и aggregate, после collect_list. I Предполагается, что данные будут упорядочены, как в приведенном примере, если это не так, необходимо добавить дополнительный шаг, чтобы гарантировать это.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("x","flag")
w1=Window().partitionBy("x","flag").orderBy(F.to_date("date","yyyy-dd-MM"))

df.join(df1,['x'])\
  .withColumn("result", F.collect_list("value").over(w))\
  .withColumn("rowNum", F.row_number().over(w1)-1)\
  .withColumn("result", F.expr("""aggregate(transform(result,(x,i)->array(x,i)),0,(acc,x)-> \
                             IF((int(x[1])>=rowNum)and(int(x[1])<days+rowNum),int(x[0])+acc,acc))"""))\
  .drop("flag","rowNum","days").show()


#+---+----------+-----+------+
#|  x|      date|value|result|
#+---+----------+-----+------+
#| ll|2020-01-05|   10|    50|
#| ll|2020-01-06|   10|    50|
#| ll|2020-01-07|   10|    60|
#| ll|2020-01-08|   10|    60|
#| ll|2020-01-09|   10|    50|
#| ll|2020-01-10|   10|    40|
#| ll|2020-01-11|   20|    30|
#| ll|2020-01-12|   10|    10|
#| ll|2020-01-05|   30|   160|
#| ll|2020-01-06|   30|   140|
#| ll|2020-01-07|   30|   120|
#| ll|2020-01-08|   40|   100|
#| ll|2020-01-09|   30|    60|
#| ll|2020-01-10|   10|    30|
#| ll|2020-01-11|   10|    20|
#| ll|2020-01-12|   10|    10|
#| yy|2020-01-05|   20|   140|
#| yy|2020-01-06|   20|   140|
#| yy|2020-01-07|   20|   140|
#| yy|2020-01-08|   20|   120|
#| yy|2020-01-09|   20|   100|
#| yy|2020-01-10|   40|    80|
#| yy|2020-01-11|   20|    40|
#| yy|2020-01-12|   20|    20|
#| yy|2020-01-05|   40|   240|
#| yy|2020-01-06|   40|   260|
#| yy|2020-01-07|   40|   260|
#| yy|2020-01-08|   40|   220|
#| yy|2020-01-09|   40|   180|
#| yy|2020-01-10|   40|   140|
#| yy|2020-01-11|   60|   100|
#| yy|2020-01-12|   40|    40|
#+---+----------+-----+------+

Кроме того, в вашем примере строка номер 9 должно быть 160 вместо 170.

...