Я пытаюсь обучить агента усиленного обучения (RL) в пользовательской среде. Мой агент RL - стабильная базовая реализация алгоритма DDPG. Он использует несколько потоков для обучения. Одним из аргументов этого агента является среда, которая реализуется в соответствии с пользовательским интерфейсом среды Open AI gym. Моя среда должна быть в состоянии что-то предсказать, используя модель Keras (tenserflow). Но не допускает этого в настройке потока.
Где прогноз выглядит просто следующим образом: label = np.argmax(model.predict(x.reshape(1,28,28,1)))
Алгоритм DDPG выглядит следующим образом:
from stable_baselines.ddpg.policies import MlpPolicy
from stable_baselines import DDPG
model_ddpg = DDPG(MlpPolicy, env, verbose=1, param_noise=param_noise, action_noise=action_noise)
model_ddpg.learn(total_timesteps=10)
И при запуске этого получаю следующее сообщение об ошибке:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)
1112 subfeed_t = self.graph.as_graph_element(
-> 1113 subfeed, allow_tensor=True, allow_operation=False)
1114 except Exception as e:
14 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py in as_graph_element(self, obj, allow_tensor, allow_operation)
3795 with self._lock:
-> 3796 return self._as_graph_element_locked(obj, allow_tensor, allow_operation)
3797
/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py in _as_graph_element_locked(self, obj, allow_tensor, allow_operation)
3874 if obj.graph is not self:
-> 3875 raise ValueError("Tensor %s is not an element of this graph." % obj)
3876 return obj
ValueError: Tensor Tensor("Placeholder_25:0", shape=(), dtype=float32) is not an element of this graph.
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-84-761d68b4955a> in <module>()
13
14 model_ddpg = DDPG(MlpPolicy, env, verbose=1, param_noise=param_noise, action_noise=action_noise)
---> 15 model_ddpg.learn(total_timesteps=10)
16 model_ddpg.save(root_dir + "ddpg_mnist_BA")
17
/usr/local/lib/python3.6/dist-packages/stable_baselines/ddpg/ddpg.py in learn(self, total_timesteps, callback, log_interval, tb_log_name, reset_num_timesteps, replay_wrapper)
892 unscaled_action = unscale_action(self.action_space, action)
893
--> 894 new_obs, reward, done, info = self.env.step(unscaled_action)
895
896 self.num_timesteps += 1
<ipython-input-78-da82ecc928b0> in step(self, action)
44
45 def step(self, action):
---> 46 self._BA_step(action)
47
48 #observation update ,
<ipython-input-78-da82ecc928b0> in _BA_step(self, action)
109 x_candidate = generate_sample(self.x_orig)
--> 110 label_candidate, dist_candidate = self._predict_sample(self.model,x_candidate, self.x_orig)
<ipython-input-78-da82ecc928b0> in _predict_sample(self, model, x, x_orig_normed)
130 # nn = NeuralNetwork()
131
--> 132 lbl = np.argmax(model.predict(x.reshape(1,28,28,1)))
133 print("predict is good")
134 if x_orig_normed is None :
<ipython-input-81-79a6b7c803c8> in predict(self, x)
29 self.model2._make_predict_function()
30
---> 31 y = self.model2.predict(x)
32 return y
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in predict(self, x, batch_size, verbose, steps, callbacks, max_queue_size, workers, use_multiprocessing)
1076 verbose=verbose,
1077 steps=steps,
-> 1078 callbacks=callbacks)
1079
1080 def reset_metrics(self):
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training_arrays.py in model_iteration(model, inputs, targets, sample_weights, batch_size, epochs, verbose, callbacks, val_inputs, val_targets, val_sample_weights, shuffle, initial_epoch, steps_per_epoch, validation_steps, validation_freq, mode, validation_in_fit, prepared_feed_values_from_dataset, steps_name, **kwargs)
248 # Setup work for each epoch
249 epoch_logs = {}
--> 250 model.reset_metrics()
251 if mode == ModeKeys.TRAIN:
252 callbacks.on_epoch_begin(epoch, epoch_logs)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in reset_metrics(self)
1082 if hasattr(self, 'metrics'):
1083 for m in self.metrics:
-> 1084 m.reset_states()
1085
1086 # Reset the state of loss metric wrappers.
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/metrics.py in reset_states(self)
197 when a metric is evaluated during training.
198 """
--> 199 K.batch_set_value([(v, 0) for v in self.variables])
200
201 @abc.abstractmethod
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/backend.py in batch_set_value(tuples)
3069 assign_ops.append(assign_op)
3070 feed_dict[assign_placeholder] = value
-> 3071 get_session().run(assign_ops, feed_dict=feed_dict)
3072
3073
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
948 try:
949 result = self._run(None, fetches, feed_dict, options_ptr,
--> 950 run_metadata_ptr)
951 if run_metadata:
952 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)
1114 except Exception as e:
1115 raise TypeError(
-> 1116 'Cannot interpret feed_dict key as Tensor: ' + e.args[0])
1117
1118 if isinstance(subfeed_val, ops.Tensor):
TypeError: Cannot interpret feed_dict key as Tensor: Tensor Tensor("Placeholder_25:0", shape=(), dtype=float32) is not an element of this graph.
Так что при передаче этой среды алгоритму DDPG у меня возникают проблемы, потому что несколько потоков будут использовать функцию model.predict()
keras, и это не поддерживается на родном языке, насколько я понимаю.
Я попробовал следующие решения,
Реализация этого, как было предложено в других темах Keras Threading stackoverflow, заключалась в создании графика и вызове функции model2._make_predict_function()
. Ничто из этого не помогло до сих пор. Я попытался создать отдельный класс и сделать следующее:
import logging
class NeuralNetwork:
def __init__(self):
self.session = tf.Session()
self.graph = tf.get_default_graph()
# the folder in which the model and weights are stored
self.model2 = tf.keras.models.load_model(root_dir+'model_mnist.h5')
self.model2._make_predict_function()
# (target_label,target) = get_target(model)
test = np.zeros(shape=(1,28,28,1))
k = np.argmax(self.model2.predict(test))
print(k)
# for some reason in a flask app the graph/session needs to be used in the init else it hangs on other threads
with self.graph.as_default():
with self.session.as_default():
print("init")
logging.info("neural network initialised")
def predict(self, x):
self.model2._make_predict_function()
with self.graph.as_default():
with self.session.as_default():
print("seems ok ?")
y = self.model2.predict(x)
return y
Я не привык использовать вещи, связанные с потоками. Так что я могу сделать много неправильно. Я также пытался просто создавать новую модель keras каждый раз, когда требовался прогноз, это, конечно, вызывало некоторые проблемы с производительностью. Так как я могу настроить эту функцию keras model.predict()
таким образом, чтобы она работала с потоками?
Tl; dr : Как заставить функцию keras model.predict()
работать с алгоритмом, который использует несколько нити