Хотя точно знать, какую ошибку вы получите, ваша проблема весьма вероятна из-за того, что слишком много данных попадает в одну строку.Чтобы преодолеть это, вы можете добавить искусственное разбиение со случайным столбцом.Таким образом, ваши сгруппированные данные будут разделены между несколькими строками, а затем и несколькими файлами, предотвращая возникновение ошибок OOM.Вот как вы могли бы это сделать:
val df = sc.parallelize(Seq((1, 2, 3), (1, 2, 4), (1, 2, 5), (1, 3, 2)))
.toDF("A", "B", "C")
То, что вы пытаетесь сделать, это, и C может стать слишком большим.
df.groupBy("A", "B")
.agg(collect_list('C))
.show
+---+---+---------------+
| A| B|collect_list(C)|
+---+---+---------------+
| 1| 2| [3, 4, 5]|
| 1| 3| [2]|
+---+---+---------------+
Вместо этого вы можете добавить случайный столбецR, чтобы предотвратить получение C слишком большим.Данные будут разбиты по нескольким значениям R, уменьшая возможный объем данных, которые можно поместить в один ряд.Здесь я использовал 3 возможных случайных значения для примера, но в вашем случае должно быть необходимо большее значение.
val new_df = df.withColumn("R", floor(rand()*3)).groupBy("A", "B", "R").agg(collect_list('C) as "C").show
new_df.show
+---+---+---+------+
| A| B| R| C|
+---+---+---+------+
| 1| 2| 1| [5]|
| 1| 2| 2|[3, 4]|
| 1| 3| 1| [2]|
+---+---+---+------+
Затем вы можете записать свой секционированный кадр данных следующим образом.
new_df.write.partitionBy("A", "B", "R").parquet("...")