У меня есть спрей Df с около 130 000 строк, 5000 идентификаторов клиентов и 7000 идентификаторов продуктов. Я генерирую все возможные комбинации идентификаторов клиентов и идентификаторов продуктов (34 миллиона строк), используя перекрестное соединение и сохраняя его в полном объеме. Я удаляю комбинации из fullouter, которые уже присутствуют в Df, и затем нахожу все прогнозы, используя мою модель.
Пока все хорошо. Но я хочу преобразовать все прогнозы (30 миллионов строк) в pandas фрейм данных. Я понимаю, что преобразование будет трудно через toPandas()
из-за отсутствия строк. Поэтому я выбрал только 1 верхний прогноз для каждого идентификатора клиента - сделал это с помощью функции windows и функции номера строки.
Я предполагаю, что размер allPredictions должен был значительно сократиться до 5000 клиентов * 1 прогноз на клиент = 5000 строк Я «предполагаю», потому что count()
также занимает слишком много времени, чтобы вернуть количество строк. toPandas()
должно работать справа от фрейма данных topPredictions. Но это не работает. Слишком долго> 40 минут, и так как я работаю в Google Colab, через некоторое время сессия становится неактивной.
Я новичок в Spark. Я что-то здесь не так делаю? Какие изменения я должен сделать в своем коде? Кроме того, я попытался написать это как паркет - занимает слишком много времени. я тоже пробовал писать .csv - та же проблема.
conf = SparkConf().setAppName("trial")
conf.set("spark.sql.execution.arrow.enabled",'true')
conf.set("spark.rpc.message.maxSize",'1024mb')
conf.set("spark.executor.memory", '8g')
conf.set('spark.executor.cores', '8')
conf.set('spark.cores.max', '8')
conf.set("spark.driver.memory", '45g')
conf.set('spark.driver.maxResultSize', '21G')
conf.set("spark.driver.bindAddress", '127.0.0.1')
conf.set("spark.worker.cleanup.enabled",True)
conf.set("spark.executor.heartbeatInterval", "200000")
conf.set("spark.network.timeout", "300000")
self.sparkContext = SparkContext().getOrCreate(conf=conf)
self.sparkContext.setCheckpointDir('/checkpoint')
best_als = ALS(rank=10, maxIter=20, regParam=1.0,alpha=200.0, userCol="customerId",itemCol="productId",ratingCol="purch",implicitPrefs=True)
model=best_als.fit(Df)
df1 = Df.select("customerId")
df2 = Df.select("productId")
fullouter = df1.crossJoin(df2)
bigtest=fullouter.join(data, ["customerId","productId"],"left_anti")
allPredictions=model.transform(bigtest)
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number
window = Window.partitionBy(allPredictions['customerId']).orderBy(allPredictions['prediction'].desc())
top_allPredictions=allPredictions.select('*', row_number().over(window).alias('rank')).filter(col('rank') <= 1)
dataframe=top_allPredictions.toPandas()