Как я могу использовать updateStateByKey в pyspark для объединения списка? - PullRequest
3 голосов
/ 27 октября 2019

В моем коде мне нужно объединить список на основе ключей Dstream. Моя цель - создать список слов, которые сопоставлены двум ключам, представляющим положительные и отрицательные слова. Я использую Python 3.7 через Anaconda вместе с PySpark 2.4.

Мой код выглядит следующим образом:

words = tweets.flatMap(lambda tweet:tweet.split(' '))
"""
World cloud start1
"""
positivePairs = words.map(lambda word: ('Positive', [word]) if word in positiveWordsList else ('Positive', []))
negativePairs = words.map(lambda word: ('Negative', [word]) if word in negativeWordsList else ('Negative', []))
allwords = positivePairs.union(negativePairs)
pos_neg_word_lists = allwords.reduceByKey(lambda x,y: list(set(x+y)))

def updateFunction(newValues, runningLists):
    if runningLists:
        runningLists = runningLists + newValues
    else:
        runningLists = newValues
    return runningLists

running_pos_neg_words = pos_neg_word_lists.updateStateByKey(updateFunction)
running_pos_neg_words.pprint()

ssc.start()

Я надеюсь получить список положительных и отрицательных слов из updateFunction, но вместо этогоЯ получаю следующую ошибку:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-89419712168e> in <module>
     38 Start the computation
     39 """
---> 40 ssc.start()

/Users/sofia/spark/python/pyspark/streaming/context.py in start(self)
    180         Start the execution of the streams.
    181         """
--> 182         self._jssc.start()
    183         StreamingContext._activeContext = self
    184 

/Users/sofia/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/Users/sofia/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o21.start.
: java.io.IOException: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/Users/sofia/spark/python/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/Users/sofia/spark/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/Users/sofia/spark/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 771, in save_tuple
    save(element)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/sofia/spark/python/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/Users/sofia/spark/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 856, in save_dict
    self._batch_setitems(obj.items())
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 882, in _batch_setitems
    save(v)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 856, in save_dict
    self._batch_setitems(obj.items())
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 887, in _batch_setitems
    save(v)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 856, in save_dict
    self._batch_setitems(obj.items())
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 882, in _batch_setitems
    save(v)
  File "/home/sofia/bin/anaconda/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
  File "/Users/sofia/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/sofia/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o73.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Я думаю, что проблема связана с моей функцией updateFunction. Может ли кто-нибудь помочь с этим?

...