Вот решение
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType,IntegerType
def date_ranges(start_date,end_date):
return list(range(start_date,end_date+1))
date_ranges_udf = F.udf(date_ranges, ArrayType(IntegerType()))
df = spark.createDataFrame(
[(1,'A'),(2,'B'),(3,'C'),(4,'D'),(1,'A'),
(2,'B'),(3,'C'),(4,'A'), (1,'B'),(2,'A')], ["Date", "ID"])
df_start_end_dates = df.select(
F.min(F.col('Date')).alias('min_date'),F.max(F.col('Date')).alias('max_date')
).withColumn('start_date',F.col('min_date')).select(
'start_date',
F.explode(
date_ranges_udf(F.col('min_date'),F.col('max_date'))
).alias('end_date')
)
cond = [F.col('Date') >= F.col('start_date'), F.col('Date') <= F.col('end_date')]
df_start_end_dates.join(
df,cond
).groupBy(
'start_date','end_date'
).agg(
F.countDistinct('ID').alias('Unique')
).orderBy(
'start_date','end_date'
).show()
, в результате которого
+----------+--------+------+
|start_date|end_date|Unique|
+----------+--------+------+
| 1| 1| 2|
| 1| 2| 2|
| 1| 3| 3|
| 1| 4| 4|
+----------+--------+------+