Я пытаюсь установить рекомендательную систему с pyspark. Однако, когда я пытаюсь соответствовать этому, я получаю ошибку, которую я действительно не понимаю, говоря, что у меня есть нулевые значения в столбцах. Я проверил и дважды проверил, но у меня нет нулевого значения в кадре данных, мой userID является целым числом, а также столбец элемента. Единственный нецелочисленный столбец - рейтинг. Вот мой код:
labelIndexer = StringIndexer(inputCol='feature',outputCol='indexedfeature').fit(exploder)
exploder = labelIndexer.transform(exploder)
train, test = exploder.randomSplit([0.8,0.2])
train = train.withColumn('userID', F.col('userID').cast(IntegerType())).withColumn('indexeditem', F.col('indexeditem').cast(IntegerType()))
test = test.withColumn('userID', F.col('userID').cast(IntegerType())).withColumn('indexeditem', F.col('indexeditem').cast(IntegerType()))
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
def computeRmse(model, data):
predictions = model.transform(data)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
return rmse
ranks = [4,5]
lambdas = [0.05]
numIters = [30]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
val = test.na.drop()
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
als = ALS(rank=rank, maxIter=numIter, regParam=lmbda, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, coldStartStrategy='drop',
alpha=1.0,userCol="userID", itemCol="indexeditem", seed=1, ratingCol="rating", nonnegative=True)
model=als.fit(train)
validationRmse = computeRmse(model, val)
print("RMSE (validation) = %f for the model trained with " % validationRmse + "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter))
if (validationRmse, bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
model = bestModel
Вот мой кадр данных поезда:
+------------+------------------------------+------+--------------+
|userID |item |rating|indexeditem |
+------------+------------------------------+------+--------------+
|2641 |datediff_open_dt |25.0 |60 |
|2641 |datediff_company_open_dt |25.0 |54 |
|2641 |account |18.0 |117 |
|2641 |conto_corrente |2.0 |88 |
, и вот ошибка, которую я получаю:
File "recommender", line 1, in <module>
File "recommender", line 55, in recommender
File "/tmp/conda-563267ad-f668-4513-b7e1-4a12f500f939/real/envs/conda-env/lib/python3.6/site-packages/pyspark/ml/base.py", line 132, in fit
return self._fit(dataset)
File "/tmp/conda-563267ad-f668-4513-b7e1-4a12f500f939/real/envs/conda-env/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 295, in _fit
java_model = self._fit_java(dataset)
File "/tmp/conda-563267ad-f668-4513-b7e1-4a12f500f939/real/envs/conda-env/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 292, in _fit_java
return self._java_obj.fit(dataset._jdf)
File "/tmp/conda-563267ad-f668-4513-b7e1-4a12f500f939/real/envs/conda-env/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/tmp/conda-563267ad-f668-4513-b7e1-4a12f500f939/real/envs/conda-env/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/tmp/conda-563267ad-f668-4513-b7e1-4a12f500f939/real/envs/conda-env/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2417.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 101.0 failed 4 times, most recent failure: Lost task 10.3 in stage 101.0 (TID 3484, hdppwvr02-02-06.internal.unicreditgroup.eu, executor 4): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (int) => int)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:730)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:95)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$8.apply(Executor.scala:435)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1324)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:441)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: ALS only supports values in Integer range for columns userID and indexeditem. Value null was not numeric.
at org.apache.spark.ml.recommendation.ALSModelParams$$anonfun$4.apply(ALS.scala:102)
at org.apache.spark.ml.recommendation.ALSModelParams$$anonfun$4.apply(ALS.scala:88)
... 19 more