Как разрешить функцию предсказания модели keras в многопоточном алгоритме? - PullRequest
0 голосов
/ 17 марта 2020

Я пытаюсь обучить агента усиленного обучения (RL) в пользовательской среде. Мой агент RL - стабильная базовая реализация алгоритма DDPG. Он использует несколько потоков для обучения. Одним из аргументов этого агента является среда, которая реализуется в соответствии с пользовательским интерфейсом среды Open AI gym. Моя среда должна быть в состоянии что-то предсказать, используя модель Keras (tenserflow). Но не допускает этого в настройке потока.

Где прогноз выглядит просто следующим образом: label = np.argmax(model.predict(x.reshape(1,28,28,1)))

Алгоритм DDPG выглядит следующим образом:

from stable_baselines.ddpg.policies import MlpPolicy
from stable_baselines import DDPG


model_ddpg = DDPG(MlpPolicy, env, verbose=1, param_noise=param_noise, action_noise=action_noise)
model_ddpg.learn(total_timesteps=10)

И при запуске этого получаю следующее сообщение об ошибке:

---------------------------------------------------------------------------

ValueError                                Traceback (most recent call last)

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)
   1112             subfeed_t = self.graph.as_graph_element(
-> 1113                 subfeed, allow_tensor=True, allow_operation=False)
   1114           except Exception as e:

14 frames

/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py in as_graph_element(self, obj, allow_tensor, allow_operation)
   3795     with self._lock:
-> 3796       return self._as_graph_element_locked(obj, allow_tensor, allow_operation)
   3797 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py in _as_graph_element_locked(self, obj, allow_tensor, allow_operation)
   3874       if obj.graph is not self:
-> 3875         raise ValueError("Tensor %s is not an element of this graph." % obj)
   3876       return obj

ValueError: Tensor Tensor("Placeholder_25:0", shape=(), dtype=float32) is not an element of this graph.


During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)

<ipython-input-84-761d68b4955a> in <module>()
     13 
     14 model_ddpg = DDPG(MlpPolicy, env, verbose=1, param_noise=param_noise, action_noise=action_noise)
---> 15 model_ddpg.learn(total_timesteps=10)
     16 model_ddpg.save(root_dir + "ddpg_mnist_BA")
     17 

/usr/local/lib/python3.6/dist-packages/stable_baselines/ddpg/ddpg.py in learn(self, total_timesteps, callback, log_interval, tb_log_name, reset_num_timesteps, replay_wrapper)
    892                                 unscaled_action = unscale_action(self.action_space, action)
    893 
--> 894                             new_obs, reward, done, info = self.env.step(unscaled_action)
    895 
    896                             self.num_timesteps += 1

<ipython-input-78-da82ecc928b0> in step(self, action)
     44 
     45   def step(self, action):
---> 46     self._BA_step(action)
     47 
     48     #observation update ,

<ipython-input-78-da82ecc928b0> in _BA_step(self, action)
    109     x_candidate = generate_sample(self.x_orig)
--> 110     label_candidate, dist_candidate = self._predict_sample(self.model,x_candidate, self.x_orig)


<ipython-input-78-da82ecc928b0> in _predict_sample(self, model, x, x_orig_normed)
    130     # nn = NeuralNetwork()
    131 
--> 132     lbl = np.argmax(model.predict(x.reshape(1,28,28,1)))
    133     print("predict is good")
    134     if x_orig_normed is None :

<ipython-input-81-79a6b7c803c8> in predict(self, x)
     29     self.model2._make_predict_function()
     30 
---> 31     y = self.model2.predict(x)
     32     return y

/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in predict(self, x, batch_size, verbose, steps, callbacks, max_queue_size, workers, use_multiprocessing)
   1076           verbose=verbose,
   1077           steps=steps,
-> 1078           callbacks=callbacks)
   1079 
   1080   def reset_metrics(self):

/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training_arrays.py in model_iteration(model, inputs, targets, sample_weights, batch_size, epochs, verbose, callbacks, val_inputs, val_targets, val_sample_weights, shuffle, initial_epoch, steps_per_epoch, validation_steps, validation_freq, mode, validation_in_fit, prepared_feed_values_from_dataset, steps_name, **kwargs)
    248     # Setup work for each epoch
    249     epoch_logs = {}
--> 250     model.reset_metrics()
    251     if mode == ModeKeys.TRAIN:
    252       callbacks.on_epoch_begin(epoch, epoch_logs)

/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in reset_metrics(self)
   1082     if hasattr(self, 'metrics'):
   1083       for m in self.metrics:
-> 1084         m.reset_states()
   1085 
   1086     # Reset the state of loss metric wrappers.

/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/metrics.py in reset_states(self)
    197     when a metric is evaluated during training.
    198     """
--> 199     K.batch_set_value([(v, 0) for v in self.variables])
    200 
    201   @abc.abstractmethod

/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/backend.py in batch_set_value(tuples)
   3069           assign_ops.append(assign_op)
   3070           feed_dict[assign_placeholder] = value
-> 3071         get_session().run(assign_ops, feed_dict=feed_dict)
   3072 
   3073 

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
    948     try:
    949       result = self._run(None, fetches, feed_dict, options_ptr,
--> 950                          run_metadata_ptr)
    951       if run_metadata:
    952         proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

/usr/local/lib/python3.6/dist-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)
   1114           except Exception as e:
   1115             raise TypeError(
-> 1116                 'Cannot interpret feed_dict key as Tensor: ' + e.args[0])
   1117 
   1118           if isinstance(subfeed_val, ops.Tensor):

TypeError: Cannot interpret feed_dict key as Tensor: Tensor Tensor("Placeholder_25:0", shape=(), dtype=float32) is not an element of this graph.

Так что при передаче этой среды алгоритму DDPG у меня возникают проблемы, потому что несколько потоков будут использовать функцию model.predict() keras, и это не поддерживается на родном языке, насколько я понимаю.

Я попробовал следующие решения,

Реализация этого, как было предложено в других темах Keras Threading stackoverflow, заключалась в создании графика и вызове функции model2._make_predict_function(). Ничто из этого не помогло до сих пор. Я попытался создать отдельный класс и сделать следующее:

import logging
class NeuralNetwork:

  def __init__(self):
    self.session = tf.Session()
    self.graph = tf.get_default_graph()
    # the folder in which the model and weights are stored
    self.model2 = tf.keras.models.load_model(root_dir+'model_mnist.h5')
    self.model2._make_predict_function()    
    # (target_label,target) = get_target(model)
    test = np.zeros(shape=(1,28,28,1))
    k = np.argmax(self.model2.predict(test))
    print(k)
    # for some reason in a flask app the graph/session needs to be used in the init else it hangs on other threads
    with self.graph.as_default():
      with self.session.as_default():
        print("init")
        logging.info("neural network initialised")


  def predict(self, x):
    self.model2._make_predict_function()
    with self.graph.as_default():
      with self.session.as_default():
        print("seems ok ?")
        y = self.model2.predict(x)
    return y

Я не привык использовать вещи, связанные с потоками. Так что я могу сделать много неправильно. Я также пытался просто создавать новую модель keras каждый раз, когда требовался прогноз, это, конечно, вызывало некоторые проблемы с производительностью. Так как я могу настроить эту функцию keras model.predict() таким образом, чтобы она работала с потоками?

Tl; dr : Как заставить функцию keras model.predict() работать с алгоритмом, который использует несколько нити

1 Ответ

0 голосов
/ 18 марта 2020

ОБНОВЛЕНИЕ:

Я решил проблему, преобразовав ее в модель оценки (тензор потока) вместо модели кераса (тензор потока). Это позволяет выполнять несколько вызовов модели, не беспокоясь о безопасности потоков.

...