Проблема с разделением RDD при запуске программы als на отдельном кластере Spark - PullRequest
0 голосов
/ 07 февраля 2019

Я запускаю свою программу ALS на искровом кластере из двух узлов в pyspark. Он нормально работает в течение 20 итераций, если я отключаю checkpointIntervalin als params. Для более чем 20 итераций требуется включение CheckpointInterval. Я также дал каталог контрольных точекЭто дает мне следующую ошибку. Я не получаю проблему правильно с ней.

Эта же программа работала нормально на одной машине с 25 итерациями.

Моя ошибка:

 Py4JJavaError: An error occurred while calling o2574.fit.
 : org.apache.spark.SparkException: Checkpoint RDD has a different number
of partitions from original RDD. Original RDD [ID: 3265, num of 
partitions:10]; Checkpoint RDD [ID: 3266, num of partitions: 0].

Мой код:

 import time
 start = time.time()

 from pyspark.sql import SparkSession


 spark=SparkSession.builder.master('spark://172.16.12.200:7077')
.appName('new').getOrCreate()

 ndf = spark.read.json("Musical_Instruments_5.json")
 pd=ndf.select(ndf['asin'],ndf['overall'],ndf['reviewerID'])


 spark.sparkContext.setCheckpointDir("/home/npproject/jupyter_files /checkpoints")

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit,ParamGridBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(pd.columns)-set(['overall'])) ]

pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(pd).transform(pd)
(training,test)=transformed.randomSplit([0.8, 0.2])
als=ALS(maxIter=25,regParam=0.09,rank=25,userCol="reviewerID_index",
itemCol="asin_index",ratingCol="overall",
checkpointInterval=5,coldStartStrategy="drop",
checkpointInterval=-1,nonnegative=True)
model=als.fit(training)
evaluator=RegressionEvaluator(metricName="rmse",
labelCol="overall",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
print("Rank: ",model.rank)
print("MaxIter: ",model._java_obj.parent().getMaxIter())
print("RegParam: ",model._java_obj.parent().getRegParam())

user_recs=model.recommendForAllUsers(10).show(20)

end = time.time()
print("execution time",end-start)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...