Раньше мой код работал нормально, но теперь он показывает проблему с кеш-памятью. Моя программа включает в себя загрузку, преобразование и обработку DataFrame на ноутбуке jupyter, связанном с pyspark-shell.Я не понимаю, в чем заключается основная проблема и как ее решить. Любая помощь высоко ценится.
Мой код:
import time
start = time.time()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('spark://172.16.12.200:7077').appName('new').getOrCreate()
ndf = spark.read.json("Musical_Instruments.json")
pd=ndf.select(ndf['asin'],ndf['overall'],ndf['reviewerID'])
spark.sparkContext.setCheckpointDir("/home/npproject/jupyter_files/checkpoints")
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit,ParamGridBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(pd.columns)-set(['overall'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(pd).transform(pd)
(training,test)=transformed.randomSplit([0.8, 0.2])
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="reviewerID_index",itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)
evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
print("Rank: ",model.rank)
print("MaxIter: ",model._java_obj.parent().getMaxIter())
print("RegParam: ",model._java_obj.parent().getRegParam())
user_recs=model.recommendForAllUsers(10).show(20)
end = time.time()
print("execution time",end-start)
Ошибкакод:
Error:
Py4JJavaError: An error occurred while calling o40.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 5, 172.16.12.208, executor 1): java.io.FileNotFoundException: File file:/home/npproject/jupyter_files /Musical_Instruments.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.