У меня есть датафрейм, который содержит некоторые продукты, дату и значение. Теперь даты имеют разные промежутки между записанными значениями, которые я хочу заполнить. Таким образом, у меня есть записанное значение для каждого часа с момента первого просмотра продукта до последнего, если нет записи, которую я хочу использовать последнее значение.
Итак, у меня есть такой фрейм данных:
| ProductId | Date | Value |
|-----------|-------------------------------|-------|
| 1 | 2020-03-12T00:00:00.000+0000 | 4 |
| 1 | 2020-03-12T01:00:00.000+0000 | 2 |
| 2 | 2020-03-12T01:00:00.000+0000 | 3 |
| 2 | 2020-03-12T03:00:00.000+0000 | 4 |
| 1 | 2020-03-12T05:00:00.000+0000 | 4 |
| 3 | 2020-03-12T05:00:00.000+0000 | 2 |
Я хочу создать новый фрейм данных, который выглядит следующим образом:
| ProductId | Date | Value |
|-----------|-------------------------------|-------|
| 1 | 2020-03-12T00:00:00.000+0000 | 4 |
| 1 | 2020-03-12T01:00:00.000+0000 | 2 |
| 1 | 2020-03-12T02:00:00.000+0000 | 2 |
| 1 | 2020-03-12T03:00:00.000+0000 | 2 |
| 1 | 2020-03-12T04:00:00.000+0000 | 2 |
| 1 | 2020-03-12T05:00:00.000+0000 | 4 |
| 2 | 2020-03-12T01:00:00.000+0000 | 3 |
| 2 | 2020-03-12T02:00:00.000+0000 | 3 |
| 2 | 2020-03-12T03:00:00.000+0000 | 4 |
| 3 | 2020-03-12T05:00:00.000+0000 | 2 |
Мой код:
def generate_date_series(start, stop):
start = datetime.strptime(start, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
stop = datetime.strptime(stop, "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
return [start + datetime.timedelta(hours=x) for x in range(0, (stop-start).hours + 1)]
spark.udf.register("generate_date_series", generate_date_series, ArrayType(TimestampType()))
df = df.withColumn("max", max(col("Date")).over(Window.partitionBy("ProductId"))) \
.withColumn("min", min(col("Date")).over(Window.partitionBy("ProductId"))) \
.withColumn("Dato", explode(generate_date_series(col("min"), col("max"))) \
.over(Window.partitionBy("ProductId").orderBy(col("Dato").desc())))
window_over_ids = (Window.partitionBy("ProductId").rangeBetween(Window.unboundedPreceding, -1).orderBy("Date"))
df = df.withColumn("Value", last("Value", ignorenulls=True).over(window_over_ids))
Ошибка:
TypeError: strptime() argument 1 must be str, not Column
Итак, первый вопрос, очевидно, состоит в том, как правильно создать и вызвать udf, чтобы я не столкнулся с вышеуказанной ошибкой.
Второй вопрос: как мне выполнить задачу, чтобы я мог получить мой желаемый фрейм данных?