Я пытаюсь запустить пример системы рекомендаций, которую я получил из онлайн-класса, однако, когда я пытаюсь запустить строку
model = als.fit(train)
, я получаю следующую ошибку
Py4JJavaError Traceback (most recent call last)
<ipython-input-7-e3ce1dc2b89b> in <module>
----> 1 model = als.fit(train)
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset)
286
287 def _fit(self, dataset):
--> 288 java_model = self._fit_java(dataset)
289 model = self._create_model(java_model)
290 return self._copyValues(model)
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset)
283 """
284 self._transfer_params_to_java()
--> 285 return self._java_obj.fit(dataset._jdf)
286
287 def _fit(self, dataset):
C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 raise Py4JJavaError(
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
322 raise Py4JError(
Py4JJavaError: An error occurred while calling o37.fit.
: org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1838)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1751)
at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1924)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1923)
at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1030)
at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674)
at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:568)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Кто-нибудь знает, что здесь может происходить? возможно, где-то произошла неправильная установка?
РЕДАКТИРОВАТЬ: Так как меня попросили показать остальную часть кода
import findspark
findspark.init("C:\spark-2.3.0-bin-hadoop2.7")
import pyspark
from pyspark.sql import DataFrameNaFunctions
from pyspark.sql.functions import lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions
import pandas as pd
import numpy as np
spark = SparkSession.builder.appName('sistemi di raccomandazione').getOrCreate()
df = spark.read.csv('/Users/Andrea/ml-latest-small/ratings.csv',
header = True, inferSchema = True)
df.show()
df.columns
df.describe().show()
train, test = df.randomSplit([0.7, 0.3])
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=10, regParam=0.01, userCol="userId",
itemCol="movieId",
ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(train)
набор данных из MovieLens
EDIT2: попытка чтобы запустить код, предложенный sid_k, выдайте мне следующую ошибку
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-2e64a86f2857> in <module>
7 sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
8 load_data=sc.parallelize([1,2,3])
----> 9 load_data.foreach(print())
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in foreach(self, f)
795 f(x)
796 return iter([])
--> 797 self.mapPartitions(processPartition).count() # Force evaluation
798
799 def foreachPartition(self, f):
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in count(self)
1054 3
1055 """
-> 1056 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1057
1058 def stats(self):
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in sum(self)
1045 6.0
1046 """
-> 1047 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1048
1049 def count(self):
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in fold(self, zeroValue, op)
919 # zeroValue provided to each partition is unique from the one provided
920 # to the final reduce call
--> 921 vals = self.mapPartitions(func).collect()
922 return reduce(op, vals, zeroValue)
923
C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in collect(self)
822 """
823 with SCCallSiteSync(self.context) as css:
--> 824 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
825 return list(_load_from_socket(port, self._jrdd_deserializer))
826
C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 raise Py4JJavaError(
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
322 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
return func(split, prev_func(split, iterator))
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
return func(split, prev_func(split, iterator))
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
return func(split, prev_func(split, iterator))
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 362, in func
return f(iterator)
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 795, in processPartition
f(x)
TypeError: 'NoneType' object is not callable
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
return func(split, prev_func(split, iterator))
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
return func(split, prev_func(split, iterator))
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
return func(split, prev_func(split, iterator))
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 362, in func
return f(iterator)
File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 795, in processPartition
f(x)
TypeError: 'NoneType' object is not callable
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more