Для Spark 2.4.0 + , вы можете использовать sequence + transform + explode для созданияНовые строки с этой задачей:
from pyspark.sql.functions import expr
df_new = (df
.withColumn('s_date', expr("last_day(to_date(StartDate, 'M/d/yyyy'))"))
.withColumn('e_date', expr("last_day(IFNULL(to_date(EndDate, 'M/d/yyyy'), add_months(current_date(),-1)))"))
.withColumn('EndOfTheMonth', expr('''
explode_outer(transform(
sequence(0, int(months_between(e_date, s_date))), i -> add_months(s_date,i)
))
'''))
.withColumn('IsDeliveryOpen', expr("IF(e_date > EndOfTheMonth or EndDate is Null, 1, 0)"))
)
df_new.show()
+---------+---------+--------------+---------+------------+----------+----------+-------------+--------------+
|Reference|StartDate|StartTimestamp| EndDate|EndTimestamp| s_date| e_date|EndOfTheMonth|IsDeliveryOpen|
+---------+---------+--------------+---------+------------+----------+----------+-------------+--------------+
| 1|8/15/2019| SOD|9/18/2019| EOD|2019-08-31|2019-09-30| 2019-08-31| 1|
| 1|8/15/2019| SOD|9/18/2019| EOD|2019-08-31|2019-09-30| 2019-09-30| 0|
| 2|8/16/2019| SOD|8/23/2019| EOD|2019-08-31|2019-08-31| 2019-08-31| 0|
| 3|6/17/2019| SOD| 8/4/2019| EOD|2019-06-30|2019-08-31| 2019-06-30| 1|
| 3|6/17/2019| SOD| 8/4/2019| EOD|2019-06-30|2019-08-31| 2019-07-31| 1|
| 3|6/17/2019| SOD| 8/4/2019| EOD|2019-06-30|2019-08-31| 2019-08-31| 0|
| 4| 8/1/2019| SOD| null| null|2019-08-31|2019-09-30| 2019-08-31| 1|
| 4| 8/1/2019| SOD| null| null|2019-08-31|2019-09-30| 2019-09-30| 1|
+---------+---------+--------------+---------+------------+----------+----------+-------------+--------------+
df_new = df_new.drop('s_date', 'e_date')
Как это работает:
преобразование StartDate , EndDate to DateType со значением последнего дня того же месяца ( s_date , e_date ). если EndDate равно NULL, тогда установите его значение last_day предыдущего месяца из текущей_даты
вычислите # месяцев между двумя вышеуказанными двумядаты, а затем создать последовательность (0, #months) и преобразовать ее в массив месяцев (EndOfTheMonth
) между StartDate и EndDate (включительно)
использовать explode_outer для генерации строк дляДля всех месяцев в указанном массиве
рассчитайте флаг IsDeliveryOpen соответственно. Я удалил StartDate <= EndOfTheMonth
в вашем коде, так как это всегда верно в зависимости от того, как вычисляется EndOfTheMonth .
Примечание: выше также может бытьзаписывается в виде одного оператора SQL:
df.createOrReplaceTempView('t_df')
spark.sql('''
WITH d AS (
SELECT *
, last_day(to_date(StartDate, 'M/d/yyyy')) as s_date
, last_day(IFNULL(to_date(EndDate, 'M/d/yyyy'),add_months(current_date(),-1))) as e_date
FROM t_df
)
SELECT d.*
, m.EndOfTheMonth
, IF(e_date > m.EndOfTheMonth or d.EndDate is NULL,1,0) AS IsDeliveryOpen
FROM d
LATERAL VIEW OUTER explode(
transform(sequence(0, int(months_between(e_date, s_date))), i -> add_months(s_date,i))
) m AS EndOfTheMonth
''').show()
Обновление еженедельных диапазонов:
Для ваших комментариев, чтобы делать то же самое на еженедельной основе, вы можете настроить s_date
и e_date
набудьте понедельником той же недели, используя date_trunc('WEEK', date_col)
, а затем используйте функцию sequence()
, чтобы сгенерировать массив дат между s_date и end_date с интервалом 7 дней, см. код ниже:
df_weekly = (df
.withColumn('s_date', expr("date(date_trunc('WEEK', to_date(StartDate, 'M/d/yyyy')))"))
.withColumn('e_date', expr("date(date_trunc('WEEK', IFNULL(to_date(EndDate, 'M/d/yyyy'), date_sub(current_date(),7))))"))
.withColumn('StartOfTheWeek', expr('explode_outer(sequence(s_date, e_date, interval 7 days))'))
.withColumn('IsDeliveryOpen', expr("IF(e_date > StartOfTheWeek or EndDate is Null, 1, 0)"))
)