Разбить интервал дат по группе и принять последнее значение в pyspark - PullRequest
0 голосов
/ 31 марта 2020

У меня есть датафрейм, который содержит некоторые продукты, дату и значение. Теперь даты имеют разные промежутки между записанными значениями, которые я хочу заполнить. Таким образом, у меня есть записанное значение для каждого часа с момента первого просмотра продукта до последнего, если нет записи, которую я хочу использовать последнее значение.

Итак, у меня есть такой фрейм данных:

| ProductId | Date                          | Value |
|-----------|-------------------------------|-------|
| 1         | 2020-03-12T00:00:00.000+0000  | 4     |
| 1         | 2020-03-12T01:00:00.000+0000  | 2     |
| 2         | 2020-03-12T01:00:00.000+0000  | 3     |
| 2         | 2020-03-12T03:00:00.000+0000  | 4     |
| 1         | 2020-03-12T05:00:00.000+0000  | 4     |
| 3         | 2020-03-12T05:00:00.000+0000  | 2     |

Я хочу создать новый фрейм данных, который выглядит следующим образом:

| ProductId | Date                          | Value |
|-----------|-------------------------------|-------|
| 1         | 2020-03-12T00:00:00.000+0000  | 4     |
| 1         | 2020-03-12T01:00:00.000+0000  | 2     |
| 1         | 2020-03-12T02:00:00.000+0000  | 2     |
| 1         | 2020-03-12T03:00:00.000+0000  | 2     |
| 1         | 2020-03-12T04:00:00.000+0000  | 2     |
| 1         | 2020-03-12T05:00:00.000+0000  | 4     |
| 2         | 2020-03-12T01:00:00.000+0000  | 3     |
| 2         | 2020-03-12T02:00:00.000+0000  | 3     |
| 2         | 2020-03-12T03:00:00.000+0000  | 4     |
| 3         | 2020-03-12T05:00:00.000+0000  | 2     |

Мой код:

def generate_date_series(start, stop):
  start = datetime.strptime(start, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
  stop = datetime.strptime(stop, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
  return [start + datetime.timedelta(hours=x) for x in range(0, (stop-start).hours + 1)]

spark.udf.register("generate_date_series", generate_date_series, ArrayType(TimestampType()))

df = df.withColumn("max", max(col("Date")).over(Window.partitionBy("ProductId"))) \
       .withColumn("min", min(col("Date")).over(Window.partitionBy("ProductId"))) \
       .withColumn("Dato", explode(generate_date_series(col("min"), col("max"))) \
                          .over(Window.partitionBy("ProductId").orderBy(col("Dato").desc())))

window_over_ids = (Window.partitionBy("ProductId").rangeBetween(Window.unboundedPreceding, -1).orderBy("Date"))

df = df.withColumn("Value", last("Value", ignorenulls=True).over(window_over_ids))

Ошибка:

TypeError: strptime() argument 1 must be str, not Column

Итак, первый вопрос, очевидно, состоит в том, как правильно создать и вызвать udf, чтобы я не столкнулся с вышеуказанной ошибкой.

Второй вопрос: как мне выполнить задачу, чтобы я мог получить мой желаемый фрейм данных?

1 Ответ

0 голосов
/ 31 марта 2020

Итак, после некоторых поисков и экспериментов я нашел решение. Я определил udf, который возвращает диапазон дат между двумя датами с интервалом в 1 час. И тогда я делаю прямую заливку

Я исправил проблему с помощью следующего кода:

def missing_hours(t1, t2):
    return [t1 + timedelta(hours=x) for x in range(0, int((t2-t1).total_seconds()/3600))]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

window = Window.partitionBy("ProductId").orderBy("Date")

df_missing = df.withColumn("prev_timestamp", lag(col("Date"), 1, None).over(window)) \
                        .filter(col("prev_timestamp").isNotNull()) \
                        .withColumn("Date", explode(missing_hours_udf(col("prev_timestamp"), col("Date")))) \
                        .withColumn("Value", lit(None)) \
                        .drop("prev_timestamp")

df = df_original.union(df_missing)

window = Window.partitionBy("ProductId").orderBy("Date") \
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_values_column = last(df['Value'], ignorenulls=True).over(window)

# do the fill
df = df.withColumn('Value', filled_values_column)
...