Эй, ребята, я пытался отладить эту проблему в течение ЧАСОВ ... Я пытаюсь построить модель с помощью API Tensorflow для построения моделей. В конечном итоге я хочу обучить модели на Apache Spark (Pysparl). Я использую библиотеку "Elephas" для DDL.
, пожалуйста, помогите мне в этом.
main.py
from train_elephas import TrainLSTMElephasModel
import pandas as pd
def main():
'''
Run this program with 'spark-sumbit
Example:
spark-submit --driver-memory 1G stats_app_elephas.py
'''
csv = "../csv_test_files/stats.csv"
timesteps = 30
batch_size = 32
epochs = 5
print("No. of Progams Run Model \n")
model_no_programs_run = TrainLSTMElephasModel(csv_path=csv, column_number=1, batch_size=batch_size, epochs=epochs, timesteps=timesteps)
main()
train_elephas.py
from pyspark import SparkContext, SparkConf
from pyspark.ml import Estimator
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from typing import List
from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd
from tensorflow.keras.models import load_model
from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf
class TrainLSTMElephasModel:
'''
This class is used for training a LSTM model by passing in either a CSV file or a JSON file.
Only include EITHER a JSON file or a CSV file when you desire to train a LSTM Model.
For CSV data:
- If you know the column number that you widh to train you nodel on, specifiy it in the 'column_number'
field, and don't include a 'column_name'
- 'column_name_to_traverse' shouldn't be specified in a CSV file if the column desired is located in another table.
It would be better to just pass in the table itself than to traverse through the tables.
For JSON data:
- When passing in JSON data, and you desire to predict the future value of a field that has a lexical depth > 1,
you must specify the 'column_names_to_traverse' as a List of all the columns to get to the 'column_name' desired.
- If the column name desired is has a lexical depth > 1, fill in the 'column_name' as the first column needed to traverse
This needs to be done for columns that contain JSON data in their rows
EXAMPLE:
- We want to grab the column 'procure_calls' which has a JSON depth > 1
column_name = 'program_calls'
column_names_to_traverse = ['program_names','prcoedure_calls']
'''
# Type aliases
Vector = List[int]
def __init__(self, csv_path : str = None, json_path: str = None,\
column_name: str = None, column_names_to_traverse: Vector = [], \
column_number : int = None, timesteps : int = 30, \
batch_size : int = 32, epochs : int = 5):
self.csv_path = csv_path
self.json_path = json_path
self.column_name = column_name
self.column_name_to_traverse = column_names_to_traverse
self.column_number = column_number
self.timesteps = timesteps
self.batch_size = batch_size
self.epochs = epochs
self.train_LSTM_model()
def train_LSTM_model(self) -> SparkModel:
'''This method will return a trained LSTM model based on the CSV file path or JSON file path in for training'''
train_data = None
# Spark Session
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
# Checks if a CSV file or a JSON file is provided
if self.csv_path is not None:
train_data = self.handleCSVFile()
elif self.json_path is not None:
train_data = self.handleJSONFile()
# Reshaping to a 2D array
train_data = train_data.reshape(-1,1)
print(train_data.dtype)
print(type(train_data))
print(train_data.shape)
# Feature Scaling
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_train_data =scaler.fit_transform(train_data)
# Initialzing each x_train and y_train datasets for each column
X_train = []
y_train = []
# Appending scaled training data to each dataset
for i in range(self.timesteps, len(train_data)):
X_train.append(scaled_train_data[i - self.timesteps:i, 0])
y_train.append(scaled_train_data[i, 0])
# Numpy array creation, Keras requires numpy arrays for Inputs
X_train, y_train = np.array(X_train, dtype=int), np.array(y_train)
print(X_train.shape)
print(X_train.dtype)
# Reshaping to a 3D matrix (970, 30, 1)
#X_train = np.reshape(X_train, (X_train[0], X_train[1], 1))
print(X_train.shape)
# Reshapes to input neuron
inputs= layers.Input(shape = (X_train.shape[1], 1))
#Training Layers
x_1 = layers.LSTM(units=50, return_sequences=True)(inputs)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.Flatten()(x_1)
# 1 output neuron for each column prediction
output = layers.Dense(units=1, activation = 'relu')(x_1)
model = keras.Model(inputs=inputs, outputs=output, name = 'elephas_Model')
print(model.summary())
model.compile(optimizer = keras.optimizers.Adam(), loss = 'mean_squared_error', metrics=['accuracy'])
model.save('../csv_test_files/tf_elephas_model.h5')
del model
# Reshapes to input neuron
#input_train_model = Input(shape = (X_train.shape[1], 1), name='input_train_model')
#Training Layers
# x_1 = LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], 1))(input_train_model)
# x_1 = Dropout(0.2)(x_1)
# x_1 = LSTM(units = 50, return_sequences = True)(x_1)
# x_1 = Dropout(0.2)(x_1)
# x_1 = LSTM(units = 50, return_sequences = True)(x_1)
# x_1 = Dropout(0.2)(x_1)
#x_1 = LSTM(units = 50, return_sequences = True)(x_1)
# x_1 = Dropout(0.2)(x_1)
# x_1 = Flatten()(x_1)
# 1 ouptut neuron for each column prediction
# output_train_data = Dense(units=1, name= 'ouput_train_data')(x_1)
# model = Model(inputs=input_train_model, outputs=output_train_data)
#model.compile(optimizer = 'adam', loss = 'mean_squared_error', metrics=['accuracy'])
model = load_model('../csv_test_files/tf_elephas_model.h5')
# Create and RDD from numpy arrays
rdd = to_simple_rdd(sc, X_train, y_train)
#rdd = sc.parallelize(X_train)
# Fitting the keras model to a Spark Model
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, self.epochs, self.batch_size, verbose=1, validation_split=0.25)
#spark_model.save('../saved_lstm_models/elephas_stats_app')
return spark_model
def handleCSVFile(self) -> np.ndarray:
with open(self.csv_path) as csv:
dataframe = pd.read_csv(csv)
if self.column_number is not None:
return dataframe.iloc[:,self.column_number].values
return dataframe[self.column_name].values
def handleJSONFile(self) -> np.ndarray:
import json
from pandas.io.json import json_normalize
with open(self.json_path) as json:
json_data = json.load(json)
if not self.column_name_to_traverse:
dataframe = json_normalize(data=json_data['program_calls'],
record_path=[name for name in self.column_name_to_traverse if name is not self.column_name_to_traverse[-1]])
return dataframe[self.column_name_to_traverse[-1]].values
else:
dataframe = json_normalize(json_data)
return dataframe[self.column_name].values
Ошибка в ноутбуках Jupyter:
Using TensorFlow backend.
WARNING
No. of Progams Run Model
int64
<class 'numpy.ndarray'>
(1000, 1)
(970, 30)
int64
(970, 30)
WARNING:tensorflow:From /Users/vnovelo/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/init_ops.py:1251: calling VarianceScaling.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Model: "elephas_Model"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_1 (InputLayer) [(None, 30, 1)] 0
_________________________________________________________________
lstm (LSTM) (None, 30, 50) 10400
_________________________________________________________________
dropout (Dropout) (None, 30, 50) 0
_________________________________________________________________
lstm_1 (LSTM) (None, 30, 50) 20200
_________________________________________________________________
dropout_1 (Dropout) (None, 30, 50) 0
_________________________________________________________________
lstm_2 (LSTM) (None, 30, 50) 20200
_________________________________________________________________
dropout_2 (Dropout) (None, 30, 50) 0
_________________________________________________________________
lstm_3 (LSTM) (None, 30, 50) 20200
_________________________________________________________________
dropout_3 (Dropout) (None, 30, 50) 0
_________________________________________________________________
flatten (Flatten) (None, 1500) 0
_________________________________________________________________
dense (Dense) (None, 1) 1501
=================================================================
Total params: 72,501
Trainable params: 72,501
Non-trainable params: 0
_________________________________________________________________
None
WARNING:tensorflow:From /Users/vnovelo/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/init_ops.py:97: calling GlorotUniform.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /Users/vnovelo/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/init_ops.py:97: calling Orthogonal.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /Users/vnovelo/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/init_ops.py:97: calling Zeros.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-1-b09f3920ffe0> in <module>
21 print("No. of Progams Run Model \n")
22 model_no_programs_run = TrainLSTMElephasModel(csv_path=csv, column_number=1, batch_size=batch_size, epochs=epochs, timesteps=timesteps)
---> 23 main()
<ipython-input-1-b09f3920ffe0> in main()
20
21 print("No. of Progams Run Model \n")
---> 22 model_no_programs_run = TrainLSTMElephasModel(csv_path=csv, column_number=1, batch_size=batch_size, epochs=epochs, timesteps=timesteps)
23 main()
~/Documents/forecast_events/Forecast-Predictive-Analytics-API/docs/lstm_model/train_lstm_model/train_elephas.py in __init__(self, csv_path, json_path, column_name, column_names_to_traverse, column_number, timesteps, batch_size, epochs)
56 self.batch_size = batch_size
57 self.epochs = epochs
---> 58 self.train_LSTM_model()
59
60
~/Documents/forecast_events/Forecast-Predictive-Analytics-API/docs/lstm_model/train_lstm_model/train_elephas.py in train_LSTM_model(self)
148 #rdd = sc.parallelize(X_train)
149 # Fitting the keras model to a Spark Model
--> 150 spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
151 spark_model.fit(rdd, self.epochs, self.batch_size, verbose=1, validation_split=0.25)
152 #spark_model.save('../saved_lstm_models/elephas_stats_app')
~/anaconda3/lib/python3.7/site-packages/elephas/spark_model.py in __init__(self, model, mode, frequency, parameter_server_mode, num_workers, custom_objects, batch_size, port, *args, **kwargs)
67 if self.parameter_server_mode == 'http':
68 self.parameter_server = HttpServer(
---> 69 self.serialized_model, self.mode, self.port)
70 self.client = HttpClient(self.port)
71 elif self.parameter_server_mode == 'socket':
~/anaconda3/lib/python3.7/site-packages/elephas/parameter/server.py in __init__(self, model, mode, port, debug, threaded, use_reloader)
61 """
62
---> 63 self.master_network = dict_to_model(model)
64 self.mode = mode
65 self.master_url = None
~/anaconda3/lib/python3.7/site-packages/elephas/utils/serialization.py in dict_to_model(dict)
18 :return: Keras model instantiated from dictionary
19 """
---> 20 model = model_from_json(dict['model'])
21 model.set_weights(dict['weights'])
22 return model
~/anaconda3/lib/python3.7/site-packages/keras/engine/saving.py in model_from_json(json_string, custom_objects)
659 config = json.loads(json_string)
660 from ..layers import deserialize
--> 661 return deserialize(config, custom_objects=custom_objects)
662
663
~/anaconda3/lib/python3.7/site-packages/keras/layers/__init__.py in deserialize(config, custom_objects)
166 module_objects=globs,
167 custom_objects=custom_objects,
--> 168 printable_module_name='layer')
~/anaconda3/lib/python3.7/site-packages/keras/utils/generic_utils.py in deserialize_keras_object(identifier, module_objects, custom_objects, printable_module_name)
145 config['config'],
146 custom_objects=dict(list(_GLOBAL_CUSTOM_OBJECTS.items()) +
--> 147 list(custom_objects.items())))
148 with CustomObjectScope(custom_objects):
149 return cls.from_config(config['config'])
~/anaconda3/lib/python3.7/site-packages/keras/engine/network.py in from_config(cls, config, custom_objects)
1054 # First, we create all layers and enqueue nodes to be processed
1055 for layer_data in config['layers']:
-> 1056 process_layer(layer_data)
1057
1058 # Then we process nodes in order of layer depth.
~/anaconda3/lib/python3.7/site-packages/keras/engine/network.py in process_layer(layer_data)
1040
1041 layer = deserialize_layer(layer_data,
-> 1042 custom_objects=custom_objects)
1043 created_layers[layer_name] = layer
1044
~/anaconda3/lib/python3.7/site-packages/keras/layers/__init__.py in deserialize(config, custom_objects)
166 module_objects=globs,
167 custom_objects=custom_objects,
--> 168 printable_module_name='layer')
~/anaconda3/lib/python3.7/site-packages/keras/utils/generic_utils.py in deserialize_keras_object(identifier, module_objects, custom_objects, printable_module_name)
147 list(custom_objects.items())))
148 with CustomObjectScope(custom_objects):
--> 149 return cls.from_config(config['config'])
150 else:
151 # Then `cls` may be a function returning a class.
~/anaconda3/lib/python3.7/site-packages/keras/layers/recurrent.py in from_config(cls, config)
2344 if 'implementation' in config and config['implementation'] == 0:
2345 config['implementation'] = 1
-> 2346 return cls(**config)
2347
2348
~/anaconda3/lib/python3.7/site-packages/keras/legacy/interfaces.py in wrapper(*args, **kwargs)
89 warnings.warn('Update your `' + object_name + '` call to the ' +
90 'Keras 2 API: ' + signature, stacklevel=2)
---> 91 return func(*args, **kwargs)
92 wrapper._original_function = func
93 return wrapper
~/anaconda3/lib/python3.7/site-packages/keras/layers/recurrent.py in __init__(self, units, activation, recurrent_activation, use_bias, kernel_initializer, recurrent_initializer, bias_initializer, unit_forget_bias, kernel_regularizer, recurrent_regularizer, bias_regularizer, activity_regularizer, kernel_constraint, recurrent_constraint, bias_constraint, dropout, recurrent_dropout, implementation, return_sequences, return_state, go_backwards, stateful, unroll, **kwargs)
2224 dropout=dropout,
2225 recurrent_dropout=recurrent_dropout,
-> 2226 implementation=implementation)
2227 super(LSTM, self).__init__(cell,
2228 return_sequences=return_sequences,
~/anaconda3/lib/python3.7/site-packages/keras/layers/recurrent.py in __init__(self, units, activation, recurrent_activation, use_bias, kernel_initializer, recurrent_initializer, bias_initializer, unit_forget_bias, kernel_regularizer, recurrent_regularizer, bias_regularizer, kernel_constraint, recurrent_constraint, bias_constraint, dropout, recurrent_dropout, implementation, **kwargs)
1876 self.use_bias = use_bias
1877
-> 1878 self.kernel_initializer = initializers.get(kernel_initializer)
1879 self.recurrent_initializer = initializers.get(recurrent_initializer)
1880 self.bias_initializer = initializers.get(bias_initializer)
~/anaconda3/lib/python3.7/site-packages/keras/initializers.py in get(identifier)
513 def get(identifier):
514 if isinstance(identifier, dict):
--> 515 return deserialize(identifier)
516 elif isinstance(identifier, six.string_types):
517 config = {'class_name': str(identifier), 'config': {}}
~/anaconda3/lib/python3.7/site-packages/keras/initializers.py in deserialize(config, custom_objects)
508 module_objects=globals(),
509 custom_objects=custom_objects,
--> 510 printable_module_name='initializer')
511
512
~/anaconda3/lib/python3.7/site-packages/keras/utils/generic_utils.py in deserialize_keras_object(identifier, module_objects, custom_objects, printable_module_name)
138 if cls is None:
139 raise ValueError('Unknown ' + printable_module_name +
--> 140 ': ' + class_name)
141 if hasattr(cls, 'from_config'):
142 custom_objects = custom_objects or {}
ValueError: Unknown initializer: GlorotUniform