Получить предыдущее значение строки, используя spark sql - PullRequest
0 голосов
/ 08 апреля 2020

У меня есть такой стол.

Id   prod   val                   
1    0       0         
2    0       0         
3    1       1000         
4    0       0         
5    1       2000         
6    0       0          
7    0       0         

Я хочу добавить новый столбец new_val, и условие для этого столбца, если prod = 0, то new_val должно быть из предыдущей строки, где prod = 1. Если prod = 1, он должен иметь то же значение, что и столбец val. Как мне добиться этого, используя spark sql?

Id   prod   val       new_val                 
1    0       0        1000            
2    0       0        1000             
3    1       1000     1000                
4    0       0        2000                         
5    1       2000     2000               
6    1       4000     4000             
7    1       3000     3000       

Любая помощь с благодарностью

Ответы [ 2 ]

0 голосов
/ 08 апреля 2020

Вы можете получить это с помощью

val w = Window.orderBy("id").rowsBetween(0, Window.unboundedFollowing)
df
  .withColumn("new_val", when($"prod" === 0, null).otherwise($"val"))
  .withColumn("new_val", first("new_val", ignoreNulls = true).over(w))

Сначала он создает новый столбец со значениями null всякий раз, когда значение не изменяется:

+---+----+----+-------+
| id|prod| val|new_val|
+---+----+----+-------+
|  1|   0|   0|   null|
|  2|   0|   0|   null|
|  3|   1|1000|   1000|
|  4|   0|   0|   null|
|  5|   1|2000|   2000|
|  6|   1|4000|   4000|
|  7|   1|3000|   3000|
+---+----+----+-------+

И он заменяет значения первым ненулевое значение в следующих записях

+---+----+----+-------+
| id|prod| val|new_val|
+---+----+----+-------+
|  1|   0|   0|   1000|
|  2|   0|   0|   1000|
|  3|   1|1000|   1000|
|  4|   0|   0|   2000|
|  5|   1|2000|   2000|
|  6|   1|4000|   4000|
|  7|   1|3000|   3000|
+---+----+----+-------+
0 голосов
/ 08 апреля 2020

Вы можете использовать что-то вроде этого:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window().orderBy("id")

df = df.withColumn("new_val", F.when(F.col("prod") == 0, F.lag("val").over(w)).otherwise(F.col("val")))

В основном мы используем условие if-else:

Когда prod == 0, возьмите lag val, которое является значением предыдущей строки (по окну, упорядоченному по столбцу id), а если prod == 1, то мы используем текущее значение столбца.

...