Невозможно преобразовать Spark DataFrame в Pandas DataFrame - PullRequest
0 голосов
/ 12 апреля 2020

У меня есть спрей Df с около 130 000 строк, 5000 идентификаторов клиентов и 7000 идентификаторов продуктов. Я генерирую все возможные комбинации идентификаторов клиентов и идентификаторов продуктов (34 миллиона строк), используя перекрестное соединение и сохраняя его в полном объеме. Я удаляю комбинации из fullouter, которые уже присутствуют в Df, и затем нахожу все прогнозы, используя мою модель.

Пока все хорошо. Но я хочу преобразовать все прогнозы (30 миллионов строк) в pandas фрейм данных. Я понимаю, что преобразование будет трудно через toPandas() из-за отсутствия строк. Поэтому я выбрал только 1 верхний прогноз для каждого идентификатора клиента - сделал это с помощью функции windows и функции номера строки.

Я предполагаю, что размер allPredictions должен был значительно сократиться до 5000 клиентов * 1 прогноз на клиент = 5000 строк Я «предполагаю», потому что count() также занимает слишком много времени, чтобы вернуть количество строк. toPandas() должно работать справа от фрейма данных topPredictions. Но это не работает. Слишком долго> 40 минут, и так как я работаю в Google Colab, через некоторое время сессия становится неактивной.

Я новичок в Spark. Я что-то здесь не так делаю? Какие изменения я должен сделать в своем коде? Кроме того, я попытался написать это как паркет - занимает слишком много времени. я тоже пробовал писать .csv - та же проблема.

conf = SparkConf().setAppName("trial")
conf.set("spark.sql.execution.arrow.enabled",'true')
conf.set("spark.rpc.message.maxSize",'1024mb')
conf.set("spark.executor.memory", '8g')
conf.set('spark.executor.cores', '8')
conf.set('spark.cores.max', '8')
conf.set("spark.driver.memory", '45g')
conf.set('spark.driver.maxResultSize', '21G')
conf.set("spark.driver.bindAddress", '127.0.0.1')
conf.set("spark.worker.cleanup.enabled",True)
conf.set("spark.executor.heartbeatInterval", "200000")
conf.set("spark.network.timeout", "300000")
self.sparkContext = SparkContext().getOrCreate(conf=conf)
self.sparkContext.setCheckpointDir('/checkpoint')
best_als = ALS(rank=10, maxIter=20, regParam=1.0,alpha=200.0, userCol="customerId",itemCol="productId",ratingCol="purch",implicitPrefs=True)

model=best_als.fit(Df)

df1 = Df.select("customerId")
df2 = Df.select("productId")

fullouter = df1.crossJoin(df2)

bigtest=fullouter.join(data, ["customerId","productId"],"left_anti")

allPredictions=model.transform(bigtest)

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number

window = Window.partitionBy(allPredictions['customerId']).orderBy(allPredictions['prediction'].desc())

top_allPredictions=allPredictions.select('*', row_number().over(window).alias('rank')).filter(col('rank') <= 1)

dataframe=top_allPredictions.toPandas()

1 Ответ

0 голосов
/ 12 апреля 2020

Попробуйте это

from pyspark.sql import *  
from pyspark.sql.functions import *  
from pyspark.sql.types import *  
import numpy as np    
import pandas as pd


dataframe= top_allPredictions.select("*").toPandas()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...