Используйте функцию окна lag
с when otherwise
, чтобы проверить, если NA
в date
столбце, а затем заменить на последнее увиденное значение ..
Example:
val df=Seq(("1","A","01/20/2020"),("1","B","01/25/2020"),("1","C","01/25/2020"),("1","A","02/20/2020"),("1","B","02/25/2020"),("1","C","NA")).toDF("id","test","date")
import org.apache.spark.sql.expressions.Window
val df1=df.withColumn("new_dt",to_date(col("date"),"MM/dd/yyyy"))
val w=Window.partitionBy("id","test").orderBy(desc("new_dt"))
df1.withColumn("date",when(col("date")==="NA",lag(col("date"),1).over(w)).otherwise(col("date"))).drop("new_dt").show()
//+---+----+----------+
//| id|test| date|
//+---+----+----------+
//| 1| A|02/20/2020|
//| 1| A|01/20/2020|
//| 1| B|02/25/2020|
//| 1| B|01/25/2020|
//| 1| C|01/25/2020|
//| 1| C|01/25/2020|
//+---+----+----------+
In Pyspark
df=spark.createDataFrame([("1","A","01/20/2020"),("1","B","01/25/2020"),("1","C","01/25/2020"),("1","A","02/20/2020"),("1","B","02/25/2020"),("1","C","NA")],["id","test","date"])
from pyspark.sql.functions import *
df1=df.withColumn("new_dt",to_date(col("date"),"MM/dd/yyyy"))
#change partitionby,orderby as per requirement
w=Window.partitionBy("id","test").orderBy(desc("new_dt"))
df1.withColumn("date",when(col("date")=="NA",lag(col("date"),1).over(w)).otherwise(col("date"))).drop("new_dt").show()
#+---+----+----------+
#| id|test| date|
#+---+----+----------+
#| 1| A|02/20/2020|
#| 1| A|01/20/2020|
#| 1| B|02/25/2020|
#| 1| B|01/25/2020|
#| 1| C|01/25/2020|
#| 1| C|01/25/2020|
#+---+----+----------+