Дублировать строки в соответствии с условием с разнесением и массивом - PullRequest
0 голосов
/ 21 октября 2019

Моя идея - подсчитать количество открытых поставок в конце месяца. Вот мой фрейм данных df. (SOD: начало доставки, EOD / конец поставки):

Reference   Start Date   StartTimestamp  EndDate     EndTimestamp        
1           2/15/2019    SOD             4/18/2019   EOD                                       
2           2/16/2019    SOD             2/23/2019   EOD                                        
3           2/17/2019    SOD             3/4/2019    EOD                                        
4           3/1/2019     SOD             Null        Null  



from pyspark.sql.functions import col, when, explode
from pyspark.sql import functions as F

    df1 = df.withColumn("EndOfTheMonth", F.lastday("Start Date"))
            .withColumn("IsDeliveryOpen", when((col("Start Date") <= col("EndOfTheMonth")) & ((col("EndDate") >= col("EndOfTheMonth")) | 
(col("EndTask").isNull())),1).otherwise(0))

. Для этого я хотел бы дублировать строки в новом фрейме данных для каждого месяца, когда End Date доставки вышечем `EndOfTheMonth '.

Я пытаюсь сделать это с помощью explode, но я не знаю, как использовать эту функцию:

df2 = (df1.filter(col("IsDeliveryOpen") == 1)
         .select("Reference").explode()
         .withColumn("EndOfTheMonth", F.add_months(lastday("StartDate"), 1))

Желаемый выход должен иметь возможность groupBy за EndOfTheMonth:

Reference   Start Date   StartTimestamp  EndDate     EndTimestamp EndOfTheMonth IsDeliveryOpen      
1           8/15/2019    SOD             9/18/2019   EOD          8/31/2019     1
1           8/15/2019    SOD             9/18/2019   EOD          9/30/2019     0
2           8/16/2019    SOD             8/23/2019   EOD          8/31/2019     0                         
3           6/17/2019    SOD             8/4/2019    EOD          6/30/2019     1
3           6/17/2019    SOD             8/4/2019    EOD          7/31/2019     1
3           6/17/2019    SOD             8/4/2019    EOD          8/31/2019     0
4           8/1/2019     SOD             Null        Null         8/31/2019     1
4           8/1/2019     SOD             Null        Null         9/30/2019     1

1 Ответ

0 голосов
/ 23 октября 2019

Для Spark 2.4.0 + , вы можете использовать sequence + transform + explode для созданияНовые строки с этой задачей:

from pyspark.sql.functions import expr

df_new = (df
    .withColumn('s_date', expr("last_day(to_date(StartDate, 'M/d/yyyy'))"))
    .withColumn('e_date', expr("last_day(IFNULL(to_date(EndDate, 'M/d/yyyy'), add_months(current_date(),-1)))"))
    .withColumn('EndOfTheMonth', expr('''
          explode_outer(transform(
            sequence(0, int(months_between(e_date, s_date))), i -> add_months(s_date,i)
          ))
     '''))
    .withColumn('IsDeliveryOpen', expr("IF(e_date > EndOfTheMonth or EndDate is Null, 1, 0)"))
)

df_new.show()
+---------+---------+--------------+---------+------------+----------+----------+-------------+--------------+
|Reference|StartDate|StartTimestamp|  EndDate|EndTimestamp|    s_date|    e_date|EndOfTheMonth|IsDeliveryOpen|
+---------+---------+--------------+---------+------------+----------+----------+-------------+--------------+
|        1|8/15/2019|           SOD|9/18/2019|         EOD|2019-08-31|2019-09-30|   2019-08-31|             1|
|        1|8/15/2019|           SOD|9/18/2019|         EOD|2019-08-31|2019-09-30|   2019-09-30|             0|
|        2|8/16/2019|           SOD|8/23/2019|         EOD|2019-08-31|2019-08-31|   2019-08-31|             0|
|        3|6/17/2019|           SOD| 8/4/2019|         EOD|2019-06-30|2019-08-31|   2019-06-30|             1|
|        3|6/17/2019|           SOD| 8/4/2019|         EOD|2019-06-30|2019-08-31|   2019-07-31|             1|
|        3|6/17/2019|           SOD| 8/4/2019|         EOD|2019-06-30|2019-08-31|   2019-08-31|             0|
|        4| 8/1/2019|           SOD|     null|        null|2019-08-31|2019-09-30|   2019-08-31|             1|
|        4| 8/1/2019|           SOD|     null|        null|2019-08-31|2019-09-30|   2019-09-30|             1|
+---------+---------+--------------+---------+------------+----------+----------+-------------+--------------+

df_new = df_new.drop('s_date', 'e_date')

Как это работает:

  1. преобразование StartDate , EndDate to DateType со значением последнего дня того же месяца ( s_date , e_date ). если EndDate равно NULL, тогда установите его значение last_day предыдущего месяца из текущей_даты

  2. вычислите # месяцев между двумя вышеуказанными двумядаты, а затем создать последовательность (0, #months) и преобразовать ее в массив месяцев (EndOfTheMonth) между StartDate и EndDate (включительно)

  3. использовать explode_outer для генерации строк дляДля всех месяцев в указанном массиве

  4. рассчитайте флаг IsDeliveryOpen соответственно. Я удалил StartDate <= EndOfTheMonth в вашем коде, так как это всегда верно в зависимости от того, как вычисляется EndOfTheMonth .

Примечание: выше также может бытьзаписывается в виде одного оператора SQL:

df.createOrReplaceTempView('t_df')

spark.sql('''

    WITH d AS (
        SELECT *
             , last_day(to_date(StartDate, 'M/d/yyyy')) as s_date
             , last_day(IFNULL(to_date(EndDate, 'M/d/yyyy'),add_months(current_date(),-1))) as e_date
        FROM t_df
    )
    SELECT d.*
         , m.EndOfTheMonth
         , IF(e_date > m.EndOfTheMonth or d.EndDate is NULL,1,0) AS IsDeliveryOpen
    FROM d
    LATERAL VIEW OUTER explode(
        transform(sequence(0, int(months_between(e_date, s_date))), i -> add_months(s_date,i))
    ) m AS EndOfTheMonth

''').show()

Обновление еженедельных диапазонов:

Для ваших комментариев, чтобы делать то же самое на еженедельной основе, вы можете настроить s_date и e_date набудьте понедельником той же недели, используя date_trunc('WEEK', date_col), а затем используйте функцию sequence(), чтобы сгенерировать массив дат между s_date и end_date с интервалом 7 дней, см. код ниже:

df_weekly = (df
    .withColumn('s_date', expr("date(date_trunc('WEEK', to_date(StartDate, 'M/d/yyyy')))"))
    .withColumn('e_date', expr("date(date_trunc('WEEK', IFNULL(to_date(EndDate, 'M/d/yyyy'), date_sub(current_date(),7))))"))
    .withColumn('StartOfTheWeek', expr('explode_outer(sequence(s_date, e_date, interval 7 days))'))
    .withColumn('IsDeliveryOpen', expr("IF(e_date > StartOfTheWeek or EndDate is Null, 1, 0)"))
)
...