Как суммировать значения из одного индекса в другой в pyspark с помощью group by - PullRequest
1 голос
/ 10 июля 2020

У меня есть spark dataframe, который выглядит так:

import pandas as pd
dt = pd.DataFrame({'id': ['a','a','a','a','a','a','b','b','b','b'], 
                   'delta': [1,2,3,4,5,6,7,8,9,10],
                   'pos': [2,0,0,1,2,1,2,0,0,1],
                   'index': [1,2,3,4,5,6,1,2,3,4]})

Я хотел бы суммировать delta s от pos==2 до pos==1, для всех раз, когда это происходит, by id

Итак, я хотел бы, чтобы столбец во фрейме данных spark выглядел так:

[6, 0, 0, 0, 4, 0, 24, 0, 0, 0]

Пояснение к результату:

  • 6 -> для id 'a', найдите первый pos==2 и просуммируйте все delta до (не включая) следующего pos==1, так что 1 + 2 + 3 = 6
  • 0 этого нет в pos==2
  • 0 этого нет в pos==2
  • 0 этого нет в pos==2
  • 4 -> для id 'a', найдите следующий pos==2 и просуммируйте все delta до (не включая) следующего pos==1, так что всего 4
  • 21 -> для id 'b', найдите первый pos==2 и просуммируйте все delta до (не включая) следующего pos==1, так что 7 + 8 + 9 = 24

Любые идеи как я могу сделать это эффективно в pyspark?

EDIT

Dataframe упорядочен по index и id

1 Ответ

3 голосов
/ 11 июля 2020

Как уже упоминалось в комментариях, вы ищете способ применить оконную функцию . Вам необходимо создать дополнительный идентификатор, который позволит вам разделить ваши данные для вычисления суммы вашей delta строки. Пожалуйста, взгляните на прокомментированный пример ниже:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([
                            ('a', 1, 2, 1),
                            ('a', 2, 0, 2),
                            ('a', 3, 0, 3),
                            ('a', 4, 1, 4),
                            ('a', 5, 2, 5),
                            ('a', 6, 1, 6),
                            ('b', 7, 2, 1),
                            ('b', 8, 0, 2),
                            ('b', 9, 0, 3),
                            ('b',10,1,4)
                        ],
                        ['id', 'delta', 'pos', 'index']
                    )

w1 = Window.partitionBy('id').orderBy('index')
w2 = Window.partitionBy('id', 'subgroup2').orderBy(F.desc('index'))

#apply a subgroup ID to each pos == 2 value
df = df.withColumn("subgroup", F.when(F.col('pos') == 2, F.monotonically_increasing_id()))
df.show()

#forward-fill the subgroup ID to following rows except of rows containing with pos == 1
df = df.withColumn('subgroup2', F.when(F.col('pos') != 1, F.last('subgroup', True).over(w1.rowsBetween(Window.unboundedPreceding,0))))
df.show()

#calculate the sum for each subgroup ID
df = df.withColumn('deltaSum', F.when(F.col('pos') == 2, F.sum('delta').over(w2)))
df.sort('id', 'index').show()

Вывод:

+---+-----+---+-----+----------+
| id|delta|pos|index|  subgroup|
+---+-----+---+-----+----------+
|  a|    1|  2|    1|         0|
|  a|    2|  0|    2|      null|
|  a|    3|  0|    3|      null|
|  a|    4|  1|    4|      null|
|  a|    5|  2|    5|         1|
|  a|    6|  1|    6|      null|
|  b|    7|  2|    1|8589934592|
|  b|    8|  0|    2|      null|
|  b|    9|  0|    3|      null|
|  b|   10|  1|    4|      null|
+---+-----+---+-----+----------+

+---+-----+---+-----+----------+----------+
| id|delta|pos|index|  subgroup| subgroup2|
+---+-----+---+-----+----------+----------+
|  b|    7|  2|    1|8589934592|8589934592|
|  b|    8|  0|    2|      null|8589934592|
|  b|    9|  0|    3|      null|8589934592|
|  b|   10|  1|    4|      null|      null|
|  a|    1|  2|    1|         0|         0|
|  a|    2|  0|    2|      null|         0|
|  a|    3|  0|    3|      null|         0|
|  a|    4|  1|    4|      null|      null|
|  a|    5|  2|    5|         1|         1|
|  a|    6|  1|    6|      null|      null|
+---+-----+---+-----+----------+----------+

+---+-----+---+-----+----------+----------+--------+
| id|delta|pos|index|  subgroup| subgroup2|deltaSum|
+---+-----+---+-----+----------+----------+--------+
|  a|    1|  2|    1|         0|         0|       6|
|  a|    2|  0|    2|      null|         0|    null|
|  a|    3|  0|    3|      null|         0|    null|
|  a|    4|  1|    4|      null|      null|    null|
|  a|    5|  2|    5|         1|         1|       5|
|  a|    6|  1|    6|      null|      null|    null|
|  b|    7|  2|    1|8589934592|8589934592|      24|
|  b|    8|  0|    2|      null|8589934592|    null|
|  b|    9|  0|    3|      null|8589934592|    null|
|  b|   10|  1|    4|      null|      null|    null|
+---+-----+---+-----+----------+----------+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...