cPickle.PicklingError: Не удалось сериализовать объект: NotImplementedError: numpy () доступен только при активном быстром выполнении - PullRequest
0 голосов
/ 29 апреля 2020

В настоящее время я экспериментирую с Elephas и Apache Spark, чтобы попытаться запустить кластерные ANN. В настоящее время у меня есть только одна машина, работающая как ведущая и ведомая. Кажется, все работает нормально, за исключением случаев, когда я добавляю metrics = ['precision'] к моей функции compile (). Добавлю, что эта проблема появляется только при использовании с PySpark и Elephas. Если я использую только Keras, все работает нормально (и к счастью!). Предложенные ответы на другие вопросы не сработали для меня. Я использую Java 8 и Python 2.7. Вот ошибка, которую я получаю:

>>> Distribute load
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 597, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 406, in save_function
    self.save_function_tuple(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 660, in save_instancemethod
    obj=obj)
  File "/usr/lib/python2.7/pickle.py", line 401, in save_reduce
    save(args)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 1152, in __reduce__
    initial_value=self.numpy(),
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 906, in numpy
    "numpy() is only available when eager execution is enabled.")
NotImplementedError: numpy() is only available when eager execution is enabled.
Traceback (most recent call last):
  File "/home/admin-tv/deeplearning/python2.7/./elephas_ann.py", line 53, in <module>
    spark_model.fit(rdd, epochs=100, batch_size=10, verbose=1, validation_split=0.1)
  File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 151, in fit
    self._fit(rdd, epochs, batch_size, verbose, validation_split)
  File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 182, in _fit
    rdd.mapPartitions(worker.train).collect()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2532, in _jrdd
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2434, in _wrap_function
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 607, in dumps
cPickle.PicklingError: Could not serialize object: NotImplementedError: numpy() is only available when eager execution is enabled.

Вот мой python скрипт:

# Import libraries
import pandas as pd
import numpy as np

# Import data
dataset = pd.read_csv('/home/admin-tv/deeplearning/Churn_Modelling.csv')
X = dataset.iloc[:, 3:13]
y = dataset.iloc[:, 13]

# Encode categorical data and scale continuous data
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import make_column_transformer
preprocess = make_column_transformer(
        (OneHotEncoder(), ['Geography', 'Gender']),
        (StandardScaler(), ['CreditScore', 'Age', 'Tenure', 'Balance',
                            'NumOfProducts', 'HasCrCard', 'IsActiveMember', 
                            'EstimatedSalary']))
X = preprocess.fit_transform(X)
X = np.delete(X, [0,3], 1)

# Split in train/test
y = y.values
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

#make the ANN

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[4]')
sc = SparkContext(conf=conf)


# Initialising the ANN
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
model = Sequential()
model.add(Dense(6, input_dim=11))
model.add(Activation('relu'))
model.add(Dense(6))
model.add(Activation('relu'))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics =['accuracy'])

# Fitting the ANN to the Training set
from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)

from elephas.spark_model import SparkModel

spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=100, batch_size=10, verbose=1, validation_split=0.1)

#Prediction:
y_pred = model.predict(x_test)
y_pred = (y_pred > 0.5)
print y_pred
for i in range(30):
    print y_pred[i]
...