Как переиндексировать фрейм данных на основе каждого раздела - PullRequest
0 голосов
/ 10 февраля 2019

Предположим, у меня есть следующий фрейм данных, созданный pyspark

id  date         deleted
1   2019-02-07     true
1   2019-02-04     false
2   2019-02-01     true
3   2019-02-08     false
3   2019-02-06     true

Я хотел бы переиндексировать эту таблицу ежедневно с самой ранней даты до настоящего времени (скажем, 2019-02-09), и самой раннейдата основана на каждом идентификаторе, например, для идентификатора 1 самая ранняя дата - 2019-02-04, для идентификатора 3 - самая ранняя дата - 2019-02-06.И ожидаемый результат:

id  date         deleted
1   2019-02-04     false
1   2019-02-05     null
1   2019-02-06     null
1   2019-02-07     true
1   2019-02-08     null
1   2019-02-09     null

2   2019-02-01     true
2   2019-02-02     null
      ...
2   2019-02-09     null

3   2019-02-06     true
3   2019-02-07     null
3   2019-02-08     false
3   2019-02-09     null

Я знал, как это сделать для самой ранней даты, основанной на всех идентификаторах (т. Е. 2019-02-01), затем просто построить фрейм данных, содержащий все даты с 2019 г.От -02-01 до 2019-02-09 для каждого идентификатора (перекрестное соединение), затем левое присоединение к исходному фрейму данных.Проблема этого подхода заключается в том, что если существует дата, скажем, 1980-01-01, то переиндекс будет заполнять все данные с 1980-01-01 до настоящего времени для всех идентификаторов, что не имеет смысла, и будет влиять на производительность дляследующий ETL в этом фрейме данных.

Для самой ранней даты, основанной на каждом разделе, не удалось найти хороший способ сделать это.

Ответы [ 2 ]

0 голосов
/ 12 февраля 2019

Основываясь на решении @ abeboparebop, я исправил некоторые проблемы с форматированием и заставил его работать следующим образом:

import pyspark.sql.functions as F
from pyspark.sql.types import DateType, ArrayType
import pandas as pd

from datetime import datetime

import pandas as pd

SYDNEY_TZ = "Australia/Sydney"

def _utc_now():
    return datetime.utcnow()

def _current_datetime_index(timezone=SYDNEY_TZ):
    return pd.DatetimeIndex([_utc_now()]).tz_localize("UTC").tz_convert(timezone).tz_localize(None)


def current_datetime(timezone=SYDNEY_TZ):
    return _current_datetime_index(timezone).to_pydatetime()[0]

def generate_date_list(date_from, date_to=None):
    if date_to is None:
        date_to = current_datetime()
    return pd.date_range(date_from.date(), date_to.date(), freq="D").date.tolist()


def construct_date_range(start_date):
    return generate_date_list(pd.to_datetime(start_date))


date_range_udf = F.udf(construct_date_range, ArrayType(DateType()))


id_dates = (
    given_df
    .groupBy('id')
    .agg(F.min('date').alias('min_date'))
    .withColumn('date_arr', date_range_udf(F.col('min_date')))
    .select('id', F.explode('date_arr').alias('date'))
)

result = id_dates.join(given_df, on=['id', 'date'], how='left')
0 голосов
/ 10 февраля 2019

Если исходный DataFrame называется df, а столбец date действительно имеет тип DateType:

import pyspark.sql.functions as F
from pyspark.sql.types import DateType, ArrayType
import datetime

# create a UDF to create a range of dates from a start
# date until today
def construct_date_range(start_date):
    ndays = (datetime.datetime.today() - start_date).days
    return reversed([base - datetime.timedelta(days=x) for x in range(0, ndays+1)])
date_range_udf = F.udf(construct_date_range, ArrayType(DateType()))

# find the first date for each id, and create a record for
# all dates since the first
id_dates = (
    df
    .groupBy('id')
    .agg(F.min('date').alias('min_date'))
    .withColumn('date_arr', construct_date_range('min_date'))
    .select('id', F.explode('date_arr').alias('date'))
)

result = id_dates.join(df, on=['id','date'], how='left')
...