Первое использование spark.range()
для создания второго DataFrame, содержащего периоды. Например, с n=3
:
n = 3
periods = spark.range(1, n+1).withColumnRenamed("id", "period")
periods.show()
#+------+
#|period|
#+------+
#| 1|
#| 2|
#| 3|
#+------+
Now crossJoin
это с df
, чтобы получить желаемый результат:
df = df.crossJoin(periods)
df.show()
#+-------+-------+-----+------+
#|user_id|country|event|period|
#+-------+-------+-----+------+
#| 1| CA| 1| 1|
#| 1| CA| 1| 2|
#| 1| CA| 1| 3|
#| 2| USA| 1| 1|
#| 2| USA| 1| 2|
#| 2| USA| 1| 3|
#+-------+-------+-----+------+
Обратите внимание, что range
на самом деле не материализует DataFrame, поэтому декартово произведение не будет дорогим.
df.explain()
#== Physical Plan ==
#BroadcastNestedLoopJoin BuildRight, Cross
#:- Scan ExistingRDD[user_id#0,country#1,event#2]
#+- BroadcastExchange IdentityBroadcastMode
# +- *(1) Project [id#31L AS period#33L]
# +- *(1) Range (1, 4, step=1, splits=2)