Pyspark: как вести от указанного c значения столбца в Dataframe - PullRequest
2 голосов
/ 05 августа 2020

Фрейм данных уже отсортирован по дате,

col1 == 1 значение уникально,

и только 0 имеют дубликаты.

У меня выглядит фреймворк как это назовите это df

+--------+----+----+
    date |col1|col2|
+--------+----+----+
2020-08-01| 5|  -1|
2020-08-02| 4|  -1|
2020-08-03| 3|   3|
2020-08-04| 2|   2|
2020-08-05| 1|   4|
2020-08-06| 0|   1|
2020-08-07| 0|   2|
2020-08-08| 0|   3|
2020-08-09| 0|  -1|
+--------+----+----+

Условие - когда col1 == 1, тогда мы начинаем добавлять назад от col2 == 4 (например, 4,5,6,7 , 8, ...), а после col2 == 4 полностью вернуть 0 (например, 4,0,0,0,0 ...)

Итак, мой результат df будет выглядеть примерно так.

    +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| 5|  -1|  8 |
    2020-08-02| 4|  -1|  7 |
    2020-08-03| 3|   3|  6 |
    2020-08-04| 2|   2|  5 |
    2020-08-05| 1|   4|  4 |
    2020-08-06| 0|   1|  0 |
    2020-08-07| 0|   2|  0 |
    2020-08-08| 0|   3|  0 |
    2020-08-09| 0|  -1|  0 |
   +---------+----+----+----+  

Улучшение: Я хочу добавить дополнительное условие, где col2 == -1, когда col1 == 1 строка, а -1 идет последовательно, то я хочу подсчитать последовательные -1, а затем добавить со следующим col2 ==? стоимость. Итак, вот пример для очистки.

    +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| 5|  -1|  11|
    2020-08-02| 4|  -1|  10|
    2020-08-03| 3|   3|  9 |
    2020-08-04| 2|   2|  8 |
    2020-08-05| 1|  -1|  7 |
    2020-08-06| 0|  -1|  0 |
    2020-08-07| 0|  -1|  0 |
    2020-08-08| 0|   4|  0 |
    2020-08-09| 0|  -1|  0 |
   +---------+----+----+----+   

итак, мы видим 3 последовательных -1 (мы заботимся только о первых последовательных -1), а после последовательного у нас есть 4, затем у нас будет 4+ 3 = 7 в строке col1 == 1. это возможно?

1 Ответ

1 голос
/ 06 августа 2020

Вот моя попытка:

from pyspark.sql.functions import sum, when, col, rank, desc
from pyspark.sql import Window

w1 = Window.orderBy(desc('date'))
w2 = Window.partitionBy('case').orderBy(desc('date'))

df.withColumn('case', sum(when(col('col1') == 1, col('col2')).otherwise(0)).over(w1)) \
  .withColumn('rank', when(col('case') != 0, rank().over(w2) - 1).otherwise(0)) \
  .withColumn('want', col('case') + col('rank')) \
  .orderBy('date') \
  .show(10, False)

+----------+----+----+----+----+----+
|date      |col1|col2|case|rank|want|
+----------+----+----+----+----+----+
|2020-08-01|5   |-1  |4   |4   |8   |
|2020-08-02|4   |-1  |4   |3   |7   |
|2020-08-03|3   |3   |4   |2   |6   |
|2020-08-04|2   |2   |4   |1   |5   |
|2020-08-05|1   |4   |4   |0   |4   |
|2020-08-06|0   |1   |0   |0   |0   |
|2020-08-07|0   |2   |0   |0   |0   |
|2020-08-08|0   |3   |0   |0   |0   |
|2020-08-09|0   |-1  |0   |0   |0   |
+----------+----+----+----+----+----+
...