Как засыпать нулевые значения в каждом разделе в PySpark - PullRequest
1 голос
/ 10 января 2020

У меня есть следующий DataFrame в PySpark:

Id      DateActual          DateStart               DateEnd                 SourceCode
107 2019-08-11 00:00:00     null                    null                    1111
107 2019-08-16 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
128 2019-02-11 00:00:00     null                    null                    101
128 2019-02-13 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     168
128 2019-02-14 00:00:00     2019-02-13 00:00:00     2019-02-20 00:00:00     187

Мне нужно подставить null значения, чтобы получить следующий результат:

Id      DateActual          DateStart               DateEnd                 SourceCode
107 2019-08-11 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
107 2019-08-16 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
128 2019-02-11 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     101
128 2019-02-13 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     168
128 2019-02-14 00:00:00     2019-02-13 00:00:00     2019-02-20 00:00:00     187

По существу, DateStart и DateEnd с null значениями равны DateStart и DateEnd строки СЛЕДУЮЩАЯ , если она имеет тот же Id.

Как мне заполнить null значения, следующие вышеописанным логам c в PySpark?

DataFrame:

df = (
    sc.parallelize([
        (107, "2019-08-11 00:00:00", None, None, 1111),
        (107, "2019-08-16 00:00:00", "2019-08-11 00:00:00", "2019-08-18 00:00:00", 1111),
        (128, "2019-02-11 00:00:00", None, None, 101), 
        (128, "2019-02-13 00:00:00", "2019-02-11 00:00:00", "2019-02-11 00:00:00", 168), 
        (128, "2019-02-14 00:00:00", "2019-02-13 00:00:00", "2019-02-20 00:00:00", 187)
    ]).toDF(["Id", "DateActual", "DateStart", "DateEnd", "SourceCode"])
)

Это то, что я пробовал:

from pyspark.sql.functions import col, when 
import pyspark.sql.functions as F
from pyspark.sql.window import Window  

my_window = Window.partitionBy("Id").orderBy("DateActual")

df.withColumn("DateStart_start", when(col("DateStart").isNull(), F.lag(df.DateStart).over(my_window)).otherwise(col("DateStart"))).show()

Я делаю не нужно тривиальное решение, как df.na.fill(0). Мне нужно заменить значения null значениями NEXT ROW, что, вероятно, предполагает использование lag или другой аналогичной функции.

1 Ответ

3 голосов
/ 10 января 2020

Использование first из pyspark.sql.functions:

from pyspark.sql import Window
from pyspark.sql.functions import first

# define the window
window = Window.partitionBy('Id')\
               .orderBy('DateActual')\
               .rowsBetween(0,sys.maxsize)

# define the back-filled column
filled_column_start = first(spark_df['DateStart'], ignorenulls=True).over(window)
filled_column_end = first(spark_df['DateEnd'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df.withColumn('filled_start', filled_column_start)
spark_df_filled = spark_df_filled .withColumn('filled_end', filled_column_end)

# show off our glorious achievements
spark_df_filled.orderBy('Id').show(10)  
...