Как уже говорилось в комментариях, вы должны попытаться избежать toPandas (), так как эта функция загружает все ваши данные в драйвер.Вы можете использовать pysparks DataFrameWriter для записи ваших данных, но вам нужно преобразовать столбцы массива (предшествующий и последующий) в другой формат, прежде чем вы сможете записать свои данные в csv, так как массивы не поддерживаются,Один из способов приведения ваших столбцов к поддерживаемому типу, такому как строка, это concat_ws .
import pyspark.sql.functions as F
from pyspark.ml.fpm import FPGrowth
df = spark.createDataFrame([
(0, [1, 2, 5]),
(1, [1, 2, 3, 5]),
(2, [1, 2])
], ["id", "items"])
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)
ar=model.associationRules.withColumn('antecedent', F.concat_ws('-', F.col("antecedent").cast("array<string>")))\
.withColumn('consequent', F.concat_ws('-', F.col("consequent").cast("array<string>")))
ar.show()
Вывод:
+----------+----------+------------------+----+
|antecedent|consequent| confidence|lift|
+----------+----------+------------------+----+
| 5| 1| 1.0| 1.0|
| 5| 2| 1.0| 1.0|
| 1-2| 5|0.6666666666666666| 1.0|
| 5-2| 1| 1.0| 1.0|
| 5-1| 2| 1.0| 1.0|
| 2| 1| 1.0| 1.0|
| 2| 5|0.6666666666666666| 1.0|
| 1| 2| 1.0| 1.0|
| 1| 5|0.6666666666666666| 1.0|
+----------+----------+------------------+----+
Теперь вы можете записать свои данные в csv:
ar.write.csv('/bla', header=True)
Это создаст CSV-файл для каждого раздела.Вы можете изменить количество разделов с помощью:
ar = ar.coalesce(1)
Если spark не может записать CSV-файл из-за проблемы с памятью, попробуйте использовать другое количество разделов (до вызова ar.write) и объединитефайлы с другими инструментами, если это необходимо.