Не удалось сериализовать объект: Py4JError: Произошла ошибка при вызове o281 .__ getstate__ - PullRequest
1 голос
/ 30 июня 2019

Я новичок в потоковой передаче и пытаюсь работать с прецедентным сценарием технического обслуживания со структурированной потоковой передачей, но получаю ошибку при прогнозировании (csv-) DataStream.

Большое спасибо !!

Вызывается следующая функция, и если я удаляю часть прогнозирования, она не возвращает никакой ошибки - модель предварительно загружена и DataStreamreader тоже работает:

def process_row(row):
  """Fif and preprocess"""   
  list_feat_col_num = [item[0] for item in row.dtypes if item[1].startswith('int')|item[1].startswith('double')]
  vec_assembler = VectorAssembler(inputCols=list_feat_col_num, outputCol="features")
  row_transformed = vec_assembler.transform(row).select('machineID','datetime','failure','features')
  featureIndexer = VectorIndexer(inputCol="features",outputCol="indexedFeatures",
                           handleInvalid ="skip",
                           maxCategories=10).fit(row_transformed)  
  print(row_transformed)
  # error comes from prediction part
  """predict""" 
  rf = RandomForestClassificationModel.load("content/model")
  pipeline_rf_pred = Pipeline(stages=[featureIndexer, rf])
  row_transformed = pipeline_rf_pred.fit(row_transformed)

  prediction = model_rf.transform(row_transformed)
  print(prediction)
  pass 


> Traceback (most recent call last):
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 406, in save_function
    self.save_function_tuple(obj)
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.6/pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.6/pickle.py", line 805, in _batch_appends
    save(x)
  File "/usr/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 496, in save
    rv = reduce(self.proto)
  File "/content/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/content/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o281.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/serializers.py in dumps(self, obj)
    589         try:
--> 590             return cloudpickle.dumps(obj, 2)
    591         except pickle.PickleError:

44 frames
Py4JError: An error occurred while calling o281.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)



During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
/content/spark-2.4.3-bin-hadoop2.7/python/pyspark/serializers.py in dumps(self, obj)
    598                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    599             cloudpickle.print_exec(sys.stderr)
--> 600             raise pickle.PicklingError(msg)
    601 
    602 

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o281.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
...