В настоящее время я экспериментирую с Elephas и Apache Spark, чтобы попытаться запустить кластерные ANN. В настоящее время у меня есть только одна машина, работающая как ведущая и ведомая. Кажется, все работает нормально, за исключением случаев, когда я добавляю metrics = ['precision'] к моей функции compile (). Добавлю, что эта проблема появляется только при использовании с PySpark и Elephas. Если я использую только Keras, все работает нормально (и к счастью!). Предложенные ответы на другие вопросы не сработали для меня. Я использую Java 8 и Python 2.7. Вот ошибка, которую я получаю:
>>> Distribute load
Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 597, in dumps
return cloudpickle.dumps(obj, 2)
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 863, in dumps
cp.dump(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 260, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 406, in save_function
self.save_function_tuple(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 549, in save_function_tuple
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 660, in save_instancemethod
obj=obj)
File "/usr/lib/python2.7/pickle.py", line 401, in save_reduce
save(args)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
save(state)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 1152, in __reduce__
initial_value=self.numpy(),
File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 906, in numpy
"numpy() is only available when eager execution is enabled.")
NotImplementedError: numpy() is only available when eager execution is enabled.
Traceback (most recent call last):
File "/home/admin-tv/deeplearning/python2.7/./elephas_ann.py", line 53, in <module>
spark_model.fit(rdd, epochs=100, batch_size=10, verbose=1, validation_split=0.1)
File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 151, in fit
self._fit(rdd, epochs, batch_size, verbose, validation_split)
File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 182, in _fit
rdd.mapPartitions(worker.train).collect()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2532, in _jrdd
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2434, in _wrap_function
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 607, in dumps
cPickle.PicklingError: Could not serialize object: NotImplementedError: numpy() is only available when eager execution is enabled.
Вот мой python скрипт:
# Import libraries
import pandas as pd
import numpy as np
# Import data
dataset = pd.read_csv('/home/admin-tv/deeplearning/Churn_Modelling.csv')
X = dataset.iloc[:, 3:13]
y = dataset.iloc[:, 13]
# Encode categorical data and scale continuous data
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import make_column_transformer
preprocess = make_column_transformer(
(OneHotEncoder(), ['Geography', 'Gender']),
(StandardScaler(), ['CreditScore', 'Age', 'Tenure', 'Balance',
'NumOfProducts', 'HasCrCard', 'IsActiveMember',
'EstimatedSalary']))
X = preprocess.fit_transform(X)
X = np.delete(X, [0,3], 1)
# Split in train/test
y = y.values
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)
#make the ANN
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[4]')
sc = SparkContext(conf=conf)
# Initialising the ANN
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
model = Sequential()
model.add(Dense(6, input_dim=11))
model.add(Activation('relu'))
model.add(Dense(6))
model.add(Activation('relu'))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics =['accuracy'])
# Fitting the ANN to the Training set
from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)
from elephas.spark_model import SparkModel
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=100, batch_size=10, verbose=1, validation_split=0.1)
#Prediction:
y_pred = model.predict(x_test)
y_pred = (y_pred > 0.5)
print y_pred
for i in range(30):
print y_pred[i]