Создайте новый фрейм данных с одним столбцом - уникальный список всех дат:
datesDF = yourDF.select('Date').distinct()
Создайте еще один, который будет состоять из дат и идентификаторов, но только тех, в которых нет нулей.А также давайте сохраним только первое (каким бы ни было первое) вхождение идентификатора для каждой даты (судя по вашему примеру, вы можете иметь несколько строк на дату)
noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')
Теперь давайте объединяем эти две, чтобы у нас был списоквсех дат с любым значением, которое у нас есть для него (или ноль)
joinedDF = datesDF.join(noNullsDF, 'Date', 'left')
Теперь для каждой даты получите значение идентификатора из предыдущей и следующей дат, используя оконные функции, а также давайте переименуем наш столбец идентификаторов, чтобы позжепроблем с присоединением будет меньше:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')
joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w))
.withColumn('nextID',f.lead('ID').over(w))
.withColumnRenamed('ID','newID')
Теперь давайте присоединим его к нашему исходному фрейму данных по дате
yourDF = yourDF.join(joinedDF, 'Date', 'left')
Теперь наш фрейм данных имеет 4 столбца идентификатора:
- исходный идентификатор
- newID - идентификатор любого ненулевого значения заданной даты, если оно есть, или нулевое
- previousID - идентификатор с предыдущей даты (не равно нулю, если есть, или ноль)
- nextID - идентификатор со следующей даты (не ноль, если есть или ноль)
Теперь нам нужно объединить их в finalID по порядку:
- исходное значение, еслине нулевое
- значение для текущей даты, если естьприсутствует ненулевое значение (это противоречит вашему вопросу, но код панды предлагает вам перейти <= при проверке даты), если результат не равен нулю </li>
- значение для предыдущей даты, если оно не равно нулю
- значение дляследующая дата, если она не равна нулю
- какое-то значение по умолчанию
Мы делаем это просто путем объединения:
default = 0
finalDF = yourDF.select('Date',
'ID',
f.coalesce('ID',
'newID',
'previousID',
'nextID',
f.lit(default)).alias('finalID')
)