Pyspark - накопительная сумма с условием сброса - PullRequest
2 голосов
/ 30 мая 2019

У меня есть этот фрейм данных

+---+----+---+
|  A|   B|  C|
+---+----+---+
|  0|null|  1|
|  1| 3.0|  0|
|  2| 7.0|  0|
|  3|null|  1|
|  4| 4.0|  0|
|  5| 3.0|  0|
|  6|null|  1|
|  7|null|  1|
|  8|null|  1|
|  9| 5.0|  0|
| 10| 2.0|  0|
| 11|null|  1|
+---+----+---+

Что мне нужно сделать, так это накопленная сумма значений из столбца C, пока следующее значение не станет равным нулю, затем сбросить накопленную сумму, делая это до завершения всех строк.

Ожидаемый результат:

+---+----+---+----+
|  A|   B|  C|   D|
+---+----+---+----+
|  0|null|  1|   1|
|  1| 3.0|  0|   0|
|  2| 7.0|  0|   0|
|  3|null|  1|   1|
|  4| 4.0|  0|   0|
|  5| 3.0|  0|   0|
|  6|null|  1|   1|
|  7|null|  1|   2|
|  8|null|  1|   3|
|  9| 5.0|  0|   0|
| 10| 2.0|  0|   0|
| 11|null|  1|   1|
+---+----+---+----+

Я уже использовал функцию Window().rangeBetween и достиг желаемого результата, но проблема в том, что вы не можете определить фиксированный диапазон окна, потому что DataFrame может иметь пять раз подряд число 1, иногда может быть только двумя и т. д.

Мой вопрос очень похож на этот Pyspark: накопленная сумма с условием сброса , но никто не ответил.

Для воспроизведения кадра данных:

from pyspark.shell import sc
from pyspark.sql import Window
from pyspark.sql.functions import lag, when, sum

x = sc.parallelize([
    [0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
    [5, 3.], [6, None], [7, None], [8, None], [9, 5.], [10, 2.], [11, None]])
x = x.toDF(['A', 'B'])

# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1).otherwise(0))

1 Ответ

1 голос
/ 30 мая 2019

Создайте временный столбец (grp), который будет увеличивать счетчик каждый раз, когда столбец C равен 0 (условие сброса) и используйте его в качестве столбца разделения для вашей совокупной суммы.

import pyspark.sql.functions as f
from pyspark.sql import Window

x.withColumn(
    "grp", 
    f.sum((f.col("C") == 0).cast("int")).over(Window.orderBy("A"))
).withColumn(
    "D",
    f.sum(f.col("C")).over(Window.partitionBy("grp").orderBy("A"))
).drop("grp").show()
#+---+----+---+---+
#|  A|   B|  C|  D|
#+---+----+---+---+
#|  0|null|  1|  1|
#|  1| 3.0|  0|  0|
#|  2| 7.0|  0|  0|
#|  3|null|  1|  1|
#|  4| 4.0|  0|  0|
#|  5| 3.0|  0|  0|
#|  6|null|  1|  1|
#|  7|null|  1|  2|
#|  8|null|  1|  3|
#|  9| 5.0|  0|  0|
#| 10| 2.0|  0|  0|
#| 11|null|  1|  1|
#+---+----+---+---+
...