У меня есть следующий PySpark DataFrame df
:
itemid eventid timestamp timestamp_end n
134 30 2016-07-02 2016-07-09 2
134 32 2016-07-03 2016-07-10 2
125 32 2016-07-10 2016-07-17 1
Я хочу преобразовать этот DataFrame в следующий:
itemid eventid timestamp_start timestamp timestamp_end
134 30 2016-07-02 2016-07-02 2016-07-09
134 32 2016-07-02 2016-07-03 2016-07-09
134 30 2016-07-03 2016-07-02 2016-07-10
134 32 2016-07-03 2016-07-03 2016-07-10
125 32 2016-07-10 2016-07-10 2016-07-17
Обычно для каждого уникального значения itemid
, мне нужно взять timestamp
и поместить его в новый столбец timestamp_start
. Таким образом, каждая строка в группе itemid
должна дублироваться n
раз, где n
- количество записей в группе. Надеюсь, я объяснил это ясно.
Это мой начальный DataFrame в PySpark:
from pyspark.sql.functions import col, expr
df = (
sc.parallelize([
(134, 30, "2016-07-02", "2016-07-09"), (134, 32, "2016-07-03", "2016-07-10"),
(125, 32, "2016-07-10", "2016-07-17"),
]).toDF(["itemid", "eventid", "timestamp", "timestamp_end"])
.withColumn("timestamp", col("timestamp").cast("timestamp"))
.withColumn("timestamp_end", col("timestamp_end").cast("timestamp_end"))
)
Пока мне удалось скопировать строки n
раз:
new_df = df.withColumn("n", expr("explode(array_repeat(n,int(n)))"))
Но как я могу создать timestamp_start
, как показано в примере выше?
Спасибо.