Переход от pandas к pyspark - преобразовать фрейм данных с начальной и конечной датой в ежедневные данные? - PullRequest
0 голосов
/ 04 марта 2020

У меня есть сценарий ETL, использующий Pandas, и чтобы сделать его более масштабируемым, я пытаюсь воссоздать его с помощью Pyspark. Пока все идет, но возникают проблемы с определенным преобразованием в ежедневный набор данных. У меня есть одна запись для каждого идентификатора с начальной и конечной датами

id  age state   start_date  end_date
123 18  CA     2/17/2019    5/4/2019
223 24  AZ     1/17/2019    3/4/2019

Я хочу создать запись для каждого дня между начальным и конечным днем, чтобы я мог присоединять к нему данные о ежедневной активности. Целевой вывод будет выглядеть примерно так:

id  age state   start_date
123 18  CA      2/17/2019
123 18  CA      2/18/2019
123 18  CA      2/19/2019
123 18  CA      2/20/2019
123 18  CA      2/21/2019
            …
123 18  CA      5/2/2019
123 18  CA      5/3/2019
123 18  CA      5/4/2019

И, конечно, сделайте это для всех идентификаторов и их соответствующих дат начала в наборе данных. Я смог сделать это в Pandas, используя следующий подход

melt = df.melt(id_vars=['id', 'age', 'state'], value_name='date').drop('variable', axis=1)
melt['date'] = pd.to_datetime(melt['date'])

melt = melt.groupby('id').apply(lambda x: x.set_index('date').resample('d').first())\
           .ffill()\
           .reset_index(level=1)\
           .reset_index(drop=True)

Но я довольно новичок в Pyspark (и боролся с этим в Pandas), поэтому я застрял здесь. Любая помощь очень ценится - спасибо!

1 Ответ

0 голосов
/ 05 марта 2020

Нашел решение в этом посте . Ключом для моего решения была функция разнесения, которая делает то, что мне нужно.

Код для решения моего конкретного примера c:

def date_range(t1, t2, step=60*60*24):
    return [t1 + step*x for x in range(int((t2-t1)/step)+1)]

date_range_udf = udf(date_range, ArrayType(LongType()))

df = dataF.select("id",
expr("stack(2, 'start_date', start_date, 'end_date', end_date) as (class_date,date)"))

df_base = \
    df.groupBy('id')\
        .agg(min('date').cast('integer').alias('date_min'), max('date').cast('integer')\
    .alias('date_max'))\
        .withColumn("date", explode(date_range_udf("date_min", "date_max")))\
        .drop('date_min', 'date_max')\
        .withColumn("date", from_unixtime("date"))

, который дает следующий вывод (который я можно использовать для присоединения любых дополнительных данных)

enter image description here

...