Следите за предыдущими значениями строки с дополнительным условием, используя pyspark - PullRequest
0 голосов
/ 05 марта 2019

Я использую pyspark для генерации фрейма данных, в котором мне нужно обновить столбец 'amt' значением amt в предыдущей строке, только когда amt = 0.

Например, ниже мой фрейм данных

+---+-----+
| id|amt  |
+---+-----+
|  1|    5|
|  2|    0|
|  3|    0|
|  4|    6|
|  5|    0|
|  6|    3|
+---+-----+

Теперь я хочу создать следующий DF.всякий раз, когда amt = 0, столбец modi_amt будет содержать ненулевое значение предыдущей строки, иначе никаких изменений.

+---+-----+----------+
| id|amt  |modi_amt  |
+---+-----+----------+
|  1|    5|         5|
|  2|    0|         5|
|  3|    0|         5|
|  4|    6|         6|
|  5|    0|         6|
|  6|    3|         3|
+---+-----+----------+

Я могу получить значение предыдущих строк, но мне нужна помощь для строк, где появляется несколько 0 amt (Например, id = 2,3)

код, который я использую:

from pyspark.sql.window import Window
my_window = Window.partitionBy().orderBy("id")
DF= DF.withColumn("prev_amt", F.lag(DF.amt).over(my_window))
DF= DF.withColumn("modi_amt",when(DF.amt== 0,DF.prev_amt).otherwise(DF.amt)).drop('prev_amt')

Я получаю DF

+---+-----+----------+
| id|amt  |modi_amt  |
+---+-----+----------+
|  1|    5|         5|
|  2|    0|         5|
|  3|    0|         0|
|  4|    6|         6|
|  5|    0|         6|
|  6|    3|         3|
+---+-----+----------+

в основном id 3 также должениметь modi_amt = 5

1 Ответ

0 голосов
/ 05 марта 2019

Я использовал приведенный ниже подход, чтобы получить вывод, и он работает нормально,

from pyspark.sql.window import Window
my_window = Window.partitionBy().orderBy("id")
# this will hold the previous col value
DF= DF.withColumn("prev_amt", F.lag(DF.amt).over(my_window))

# this will replace the amt 0 with previous column value, but not consecutive rows having 0 amt.  
DF = DF.withColumn("amt_adjusted",when(DF.prev_amt == 0,DF.prev_OffSet).otherwise(DF.amt))

# define null for the rows where both amt and amt_adjusted are having 0 (logic for consecutive rows having 0 amt)
DF = DF.withColumn('zeroNonZero', when((DF.amt== 0)&(DF.amt_adjusted == 0),lit(None)).otherwise(DF.amt_adjusted))

# replace all null values with previous Non zero amt row value
DF= DF.withColumn('modi_amt',last("zeroNonZero", ignorenulls= True).over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding,0)))

Есть ли другой лучший подход?

...