Невозможно запустить пример PySpark - PullRequest
0 голосов
/ 09 марта 2020

Я пытаюсь запустить пример системы рекомендаций, которую я получил из онлайн-класса, однако, когда я пытаюсь запустить строку

model = als.fit(train)

, я получаю следующую ошибку

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-7-e3ce1dc2b89b> in <module>
----> 1 model = als.fit(train)

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset)
    286 
    287     def _fit(self, dataset):
--> 288         java_model = self._fit_java(dataset)
    289         model = self._create_model(java_model)
    290         return self._copyValues(model)

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset)
    283         """
    284         self._transfer_params_to_java()
--> 285         return self._java_obj.fit(dataset._jdf)
    286 
    287     def _fit(self, dataset):

C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling o37.fit.
: org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:837)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:835)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:835)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1838)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1751)
    at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1924)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1923)
    at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
    at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1030)
    at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674)
    at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:568)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Кто-нибудь знает, что здесь может происходить? возможно, где-то произошла неправильная установка?

РЕДАКТИРОВАТЬ: Так как меня попросили показать остальную часть кода

import findspark 
findspark.init("C:\spark-2.3.0-bin-hadoop2.7")
import pyspark 
from pyspark.sql import DataFrameNaFunctions 
from pyspark.sql.functions import lit 
from pyspark.ml.feature import StringIndexer  
from pyspark.ml import Pipeline 
from pyspark.sql import SparkSession
from pyspark.sql import functions
import pandas as pd
import numpy as np
spark = SparkSession.builder.appName('sistemi di raccomandazione').getOrCreate()
df = spark.read.csv('/Users/Andrea/ml-latest-small/ratings.csv',
                   header = True, inferSchema = True)
df.show()
df.columns
df.describe().show()
train, test = df.randomSplit([0.7, 0.3])
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=10, regParam=0.01, userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating", 
          coldStartStrategy="drop")
model = als.fit(train)

набор данных из MovieLens

EDIT2: попытка чтобы запустить код, предложенный sid_k, выдайте мне следующую ошибку

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-2e64a86f2857> in <module>
      7 sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
      8 load_data=sc.parallelize([1,2,3])
----> 9 load_data.foreach(print())

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in foreach(self, f)
    795                 f(x)
    796             return iter([])
--> 797         self.mapPartitions(processPartition).count()  # Force evaluation
    798 
    799     def foreachPartition(self, f):

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in count(self)
   1054         3
   1055         """
-> 1056         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1057 
   1058     def stats(self):

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in sum(self)
   1045         6.0
   1046         """
-> 1047         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
   1048 
   1049     def count(self):

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in fold(self, zeroValue, op)
    919         # zeroValue provided to each partition is unique from the one provided
    920         # to the final reduce call
--> 921         vals = self.mapPartitions(func).collect()
    922         return reduce(op, vals, zeroValue)
    923 

C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py in collect(self)
    822         """
    823         with SCCallSiteSync(self.context) as css:
--> 824             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    825         return list(_load_from_socket(port, self._jrdd_deserializer))
    826 

C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

C:\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 362, in func
    return f(iterator)
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 795, in processPartition
    f(x)
TypeError: 'NoneType' object is not callable

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 362, in func
    return f(iterator)
  File "C:\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 795, in processPartition
    f(x)
TypeError: 'NoneType' object is not callable

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

1 Ответ

0 голосов
/ 10 марта 2020

Просто убедитесь, что findspark установлен с помощью pip install findspark в CMD и переменные окружения правильно установлены для spark. В Jupyter добавьте строки ниже и затем выполните код:

import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
load_data=sc.parallelize([1,2,3])
load_data.collect()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...