Используйте функции groupBy, collect_list
, затем дату создания _ * столбцов на основе значения array_index.
Example:
#sample dataframe
df.show()
#+---+------+
#| ID| Date|
#+---+------+
#| 1|20-Mar|
#| 1|30-Mar|
#| 1|20-Apr|
#| 2|10-Mar|
#+---+------+
from pyspark.sql.functions import *
df.groupBy("id").agg(collect_list(col("Date")).alias("tmp")).\
withColumn("Date_1",col("tmp")[0]).\
withColumn("Date_2",col("tmp")[1]).\
withColumn("Date_3",col("tmp")[2]).\
withColumn("Date_4",col("tmp")[3]).\
drop("tmp").\
show(10,False)
#+---+------+------+------+------+
#|id |Date_1|Date_2|Date_3|Date_4|
#+---+------+------+------+------+
#|1 |20-Mar|30-Mar|20-Apr|null |
#|2 |10-Mar|null |null |null |
#+---+------+------+------+------+
Начиная с Spark-2.4 используйте element_at
функцию:
df.groupBy("id").agg(collect_list(col("Date")).alias("tmp")).\
withColumn("Date_1",element_at(col("tmp"),1)).\
withColumn("Date_2",element_at(col("tmp"),2)).\
withColumn("Date_3",element_at(col("tmp"),3)).\
withColumn("Date_4",element_at(col("tmp"),4)).\
drop("tmp").\
show(10,False)
Dynamic way:
df1=df.groupBy(col("id")).agg(collect_list(col("date")))
#get the max size of array
size=df.groupBy("id").agg(collect_list(col("Date")).alias("tmp")).select(max(size("tmp"))).collect()[0][0]
df1.select([df1.id]+ [df1.tmp[i].alias("date_"+ str(i+1)) for i in range(size+1)]).\
show()
#+---+------+------+------+------+
#| id|date_1|date_2|date_3|date_4|
#+---+------+------+------+------+
#| 1|20-Mar|30-Mar|20-Apr| null|
#| 2|10-Mar| null| null| null|
#+---+------+------+------+------+