Как создать N дублированных строк в PySpark DataFrame? - PullRequest
0 голосов
/ 09 января 2020

У меня есть следующий PySpark DataFrame df:

itemid  eventid    timestamp     timestamp_end   n
134     30         2016-07-02    2016-07-09      2
134     32         2016-07-03    2016-07-10      2
125     32         2016-07-10    2016-07-17      1

Я хочу преобразовать этот DataFrame в следующий:

itemid  eventid    timestamp_start   timestamp     timestamp_end
134     30         2016-07-02        2016-07-02    2016-07-09
134     32         2016-07-02        2016-07-03    2016-07-09
134     30         2016-07-03        2016-07-02    2016-07-10
134     32         2016-07-03        2016-07-03    2016-07-10
125     32         2016-07-10        2016-07-10    2016-07-17

Обычно для каждого уникального значения itemid, мне нужно взять timestamp и поместить его в новый столбец timestamp_start. Таким образом, каждая строка в группе itemid должна дублироваться n раз, где n - количество записей в группе. Надеюсь, я объяснил это ясно.

Это мой начальный DataFrame в PySpark:

from pyspark.sql.functions import col, expr

df = (
    sc.parallelize([
        (134, 30, "2016-07-02", "2016-07-09"), (134, 32, "2016-07-03", "2016-07-10"),
        (125, 32, "2016-07-10", "2016-07-17"),
    ]).toDF(["itemid", "eventid", "timestamp", "timestamp_end"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
    .withColumn("timestamp_end", col("timestamp_end").cast("timestamp_end"))
)

Пока мне удалось скопировать строки n раз:

new_df = df.withColumn("n", expr("explode(array_repeat(n,int(n)))"))

Но как я могу создать timestamp_start, как показано в примере выше?

Спасибо.

1 Ответ

1 голос
/ 10 января 2020

IIU C, вы можете использовать оконную функцию collect_list, чтобы найти список всех временных отметок + timestamp_end в группе, а затем использовать встроенную функцию Spark SQL inline / inline_outer, чтобы взорвать результирующий массив структур:

from pyspark.sql.functions import collect_list, expr
from pyspark.sql import Window

w1 = Window.partitionBy('itemid')

df.withColumn('timestamp_range',  
    collect_list(expr("(timestamp as timestamp_start, timestamp_end)")).over(w1)
 ).selectExpr(
    'itemid',  
    'eventid', 
    'timestamp', 
    'inline_outer(timestamp_range)'
 ).show()    
+------+-------+----------+---------------+-------------+
|itemid|eventid| timestamp|timestamp_start|timestamp_end|
+------+-------+----------+---------------+-------------+
|   134|     30|2016-07-02|     2016-07-02|   2016-07-09|
|   134|     30|2016-07-02|     2016-07-03|   2016-07-10|
|   134|     32|2016-07-03|     2016-07-02|   2016-07-09|
|   134|     32|2016-07-03|     2016-07-03|   2016-07-10|
|   125|     32|2016-07-10|     2016-07-10|   2016-07-17|
+------+-------+----------+---------------+-------------+

Где: timestamp_range - это коллекционный список следующего named_struct (в синтаксисе Spark SQL):

(timestamp as timestamp_start, timestamp_end)

, что соответствует следующему:

named_struct('timestamp_start', timestamp, 'timestamp_end', timestamp_end)
...