PicklingError: не удалось сериализовать объект: TypeError: не удалось сериализовать объект _io.BufferedReader - PullRequest
1 голос
/ 04 августа 2020

Я реализую анализ настроений на основе лемматизации текста (перед вызовом udf я конвертирую формат в строку, как того требует функция лемматизации), но я столкнулся с ошибкой травления, которую я не мог понять.

PicklingError: не удалось сериализовать объект: TypeError: невозможно сериализовать объект '_io.BufferedReader'

ниже мой код

    def get_part_of_speech(word):
    probable_part_of_speech = wordnet.synsets(word)
  
    pos_counts = Counter()

    pos_counts["n"] = len(  [ item for item in probable_part_of_speech if item.pos()=="n"]  )
    pos_counts["v"] = len(  [ item for item in probable_part_of_speech if item.pos()=="v"]  )
    pos_counts["a"] = len(  [ item for item in probable_part_of_speech if item.pos()=="a"]  )
    pos_counts["r"] = len(  [ item for item in probable_part_of_speech if item.pos()=="r"]  )
      
    most_likely_part_of_speech = pos_counts.most_common(1)[0][0]
    return most_likely_part_of_speech


def Lemmatizing_Words(Words):
    Lm = WordNetLemmatizer()
    Lemmatized_Words = []
    for word in Words:
        Lemmatized_Words.append(Lm.lemmatize(word,get_part_of_speech(word)))
    return Lemmatized_Words

    sparkLemmer1 = udf(lambda x:Lemmatizing_Words(x), StringType())
    df_new_lemma_1 = dfStopwordRemoved.select('overall',sparkLemmer1('filteredreviewText'))

ошибка, с которой я столкнулся:

  PicklingError       Traceback (most recent call last)
<ipython-input-20-1a663a7b63cb> in <module>()
---->1 df_new_lemma_1 = dfStopwordRemoved.select('overall',sparkLemmer1('filteredreviewText'))


~/.conda/envs/myenv/lib/python3.5/site-packages/pyspark/sql/udf.py in wrapper(*args)
    187         @functools.wraps(self.func, assigned=assignments)
    188         def wrapper(*args):
--> 189             return self(*args)
    190 
    191         wrapper.__name__ = self._name

~/.conda/envs/myenv/lib/python3.5/site-packages/pyspark/sql/udf.py in __call__(self, *cols)
    165 
    166     def __call__(self, *cols):
--> 167         judf = self._judf
    168         sc = SparkContext._active_spark_context
    169         return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))

~/.conda/envs/myenv/lib/python3.5/site-packages/pyspark/sql/udf.py in _judf(self)
    149         # and should have a minimal performance impact.
    150         if self._judf_placeholder is None:
--> 151             self._judf_placeholder = self._create_judf()
    152         return self._judf_placeholder
    153 

~/.conda/envs/myenv/lib/python3.5/site-packages/pyspark/sql/udf.py in _create_judf(self)
    158         sc = spark.sparkContext
    159 
--> 160         wrapped_func = _wrap_function(sc, self.func, self.returnType)
    161         jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    162         judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(

~/.conda/envs/myenv/lib/python3.5/site-packages/pyspark/sql/udf.py in _wrap_function(sc, func, returnType)
     33 def _wrap_function(sc, func, returnType):
     34     command = (func, returnType)
---> 35     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     36     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
     37                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

~/.conda/envs/myenv/lib/python3.5/site-packages/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2418     # the serialized command will be compressed by broadcast
   2419     ser = CloudPickleSerializer()
-> 2420     pickled_command = ser.dumps(command)
   2421     if len(pickled_command) > (1 << 20):  # 1M
   2422         # The broadcast will have same life cycle as created PythonRDD

~/.conda/envs/myenv/lib/python3.5/site-packages/pyspark/serializers.py in dumps(self, obj)
    605                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    606             cloudpickle.print_exec(sys.stderr)
--> 607             raise pickle.PicklingError(msg)
    608 
    609 


PicklingError: Could not serialize object: TypeError: cannot serialize '_io.BufferedReader' object

1 Ответ

0 голосов
/ 05 августа 2020

Это общая проблема и недостаток модуля pickle, заключающийся в том, что он не может сохранять определенные типы объектов, а PySpark обрабатывает объекты для межпроцессного взаимодействия для распределения вычислений.

Кроме того, он не имеет смысла обрабатывать дескриптор открытого файла или потока (а WordNetLematizer из NLTK использует нижнее покрытие _io.BufferedReader дескриптор файла Wor dNet.

Чтобы обойти это, вам нужно скопировать реализация WordNetLematizer NLTK и предварительная загрузка данных для использования WordNetLematizer в функции udf PySpark.

...