В случае, если у вас есть проблемы с производительностью в pivot, приведенный ниже подход является еще одним решением той же проблемы, хотя он позволяет вам иметь больший контроль, разбивая работу на фазы для каждой категории с помощью цикла for. Для каждой итерации это добавит новые данные для category_x в acc_df, в котором будут храниться накопленные результаты.
schema = ArrayType(
StructType((
StructField("p_date", StringType(), False),
StructField("d_warranty", StringType(), False)
))
)
tuple_list_udf = udf(tuple_list, schema)
buf_size = 5 # if you get OOM error decrease this to persist more often
categories = df.select("category").distinct().collect()
acc_df = spark.createDataFrame(sc.emptyRDD(), df.schema) # create an empty df which holds the accumulated results for each category
for idx, c in enumerate(categories):
col_name = c[0].replace(" ", "_") # spark complains for columns containing space
cat_df = df.where(df["category"] == c[0]) \
.groupBy("product_id") \
.agg(
F.collect_list(F.col("purchase_date")).alias("p_date"),
F.collect_list(F.col("days_warranty")).alias("d_warranty")) \
.withColumn(col_name, tuple_list_udf(F.col("p_date"), F.col("d_warranty"))) \
.drop("p_date", "d_warranty")
if idx == 0:
acc_df = cat_df
else:
acc_df = acc_df \
.join(cat_df.alias("cat_df"), "product_id") \
.drop(F.col("cat_df.product_id"))
# you can persist here every buf_size iterations
if idx + 1 % buf_size == 0:
acc_df = acc_df.persist()
Функция tuple_list отвечает за создание списка с кортежами из столбцов purchase_date и days_warranty.
def tuple_list(pdl, dwl):
return list(zip(pdl, dwl))
Вывод этого будет:

|product_id |CATEGORY_B |CATEGORY_A |

|02147465400|[[2017-04-16 00:00:00, 90], [2018-09-16 00:00:00, 90], [2017-10-09 00:00:00, 90], [2018-01-12 00:00:00, 90], [2018-07-11 00:00:00, 90], [2017-01-21 00:00:00, 90], [2018-04-14 00:00:00, 90], [2017-01-05 00:00:00, 90], [2017-07-15 00:00:00, 90]]|[[2017-06-14 00:00:00, 30], [2018-08-14 00:00:00, 30], [2018-01-11 00:00:00, 30], [2018-04-12 00:00:00, 30], [2017-10-11 00:00:00, 30], [2017-05-16 00:00:00, 30], [2018-05-15 00:00:00, 30], [2017-04-15 00:00:00, 30], [2017-02-15 00:00:00, 30], [2018-02-12 00:00:00, 30], [2017-01-21 00:00:00, 30], [2018-07-11 00:00:00, 30], [2018-06-14 00:00:00, 30], [2017-03-16 00:00:00, 30], [2017-07-20 00:00:00, 30], [2018-08-23 00:00:00, 30], [2017-09-12 00:00:00, 30], [2018-03-12 00:00:00, 30], [2017-12-12 00:00:00, 30], [2017-08-14 00:00:00, 30], [2017-11-11 00:00:00, 30]]|
