Я обучил модель A2C в Tensorflow 2.0 в среде CartPole-v0.
Теперь я переключился на автомобильную среду (непрерывную) , где конечная цель - объединить несколько агентов в одну полосу.
Теперь есть только другое пространство действия с двумерным вектором [ускорение, угол поворота] транспортного средства эго вместо левой и правой от CartPole. Пространство наблюдения также представляет собой четырехмерный вектор с [jerk, y_Deviation, v_deviation, Collissions_with_agent_or_road].
В CartPole я выбрал действие, основанное на категориальном распределении, теперь, так как это непрерывная среда, я подумал о бета-распределении, из которого можно выбрать действие.
Тем не менее, он не работает должным образом, и я попытался изменить почти все параметры, но, похоже, не работает.
Ниже мой код для агента, а также сети:
class A2CAgent:
def __init__(self, model):
# hyperparameters for loss terms, gamma is the discount coefficient
self.params = {
'gamma': 0.9,
'value': 0.1,
'entropy': 0.1
}
self.model = model
self.model.compile(
optimizer=ko.Adam(lr=0.0003),
# define separate losses for policy logits and value estimate
loss=[self._logits_loss, self._value_loss]
)
self.buffer = rp.ReplayBuffer(buff_size, mini_batch=32)
def training_step(self, step, observations, actions, values, rewards, next_obs, dones, ep_rews):
observations[step] = next_obs.copy() # fill observations from environment
actions[step], values[step] = self.model.action_value(next_obs[None, :]) # get action and its value
next_obs, rewards[step], dones[step], _ = env.step(
actions[step]) # perform next step to get new observation, and reward
ep_rews[-1] += rewards[step] # only update last column
if dones[step]:
ep_rews.append(0.0) # create new column
next_obs = env.reset()
logging.info("Episode: %03d, Reward: %03d" % (len(ep_rews) - 1, ep_rews[-2]))
return next_obs, ep_rews, rewards, dones, values, observations
def train(self, env, batch_sz=126, updates=100, callbacks=None):
# storage helpers for a single batch of data
actions = np.empty((batch_sz,2), dtype=np.int32) # (32, )
rewards, dones, values = np.empty((3, batch_sz)) # (3, 32) - ever single has shape (32, )
observations = np.empty((batch_sz,) + (8,)) # (32, 4)
# training loop: collect samples, send to optimizer, repeat updates times
ep_rews = [0.0]
next_obs = env.reset()
#print(init_next_obs)
next_obs = next_obs
for update in range(updates):
for step in range(batch_sz):
next_obs, ep_rews, rewards, dones, values, observations = self.training_step(step, observations,
actions, values, rewards,
next_obs, dones, ep_rews)
_, next_value = self.model.action_value(next_obs[None, :]) # get next value (int) of action
# rewi = [x[2] for x in (self.buffer.experience[:self.buffer.current_index])] # extract rewards from buffer
returns, advs = self._returns_advantages(rewards, dones, values, next_value)
# returns are cumulative rewards and advantages are returns - baseline
acts_and_advs = np.concatenate([actions, advs[:, None]], axis=-1) # (32, 3) (action, advantage)
#acts_and_advs = [actions, advs] # (32, 2) (action, advantage)
# performs a full training step on the collected batch
# note: no need to mess around with gradients, Keras API handles it
#print(acts_and_advs)
losses = self.model.train_on_batch(observations, [acts_and_advs,returns])
#logging.info("[%d/%d] Losses: %s" % (update + 1, updates, losses))
return ep_rews
def test(self, env, render=False):
steps = 0
obs, done, ep_reward = env.reset(), False, 0
while not done:
steps+=1
action, _ = self.model.action_value(obs[None, :])
obs, reward, done, _ = env.step(action)
ep_reward += reward
if render:
env.draw()
plt.pause(0.001)
print(steps)
return ep_reward
def _returns_advantages(self, rewards, dones, values, next_value):
# next_value is value estimate of a future state (the critic)
returns = np.append(np.zeros_like(rewards), next_value, axis=-1) # (33, ) - 32 zeroes and 1 last, value
# returns are discounted sum of future rewards
for t in reversed(range(rewards.shape[0])): # batch_size range - t starts at 31
returns[t] = rewards[t] + self.params['gamma'] * returns[t + 1] * (1 - dones[t])
returns = returns[:-1] # get rid of last element
# advantages are returns - baseline (= value estimates in our case)
advantages = returns - values # (32, ) = len(32) - len(32)
return returns, advantages
def _value_loss(self, returns, value): # Q-Value
# value loss is typically MSE between value estimates and returns
return self.params['value'] * kls.mean_squared_error(returns, value)
def _logits_loss(self, acts_and_advs, logits): # is the policy in general good
# a trick to input actions and advantages through same API
actions = acts_and_advs[...,:2]
advantages = acts_and_advs[...,2:]
# sparse categorical CE loss obj that supports sample_weight arg on call()
# from_logits argument ensures transformation into normalized probabilities
weighted_sparse_ce = kls.MeanSquaredError()
# policy loss is defined by policy gradients, weighted by advantages
# note: we only calculate the loss on the actions we've actually taken
actions = tf.cast(actions, tf.int32)
policy_loss = weighted_sparse_ce(actions, advantages, sample_weight=advantages)
policy_loss = np.sum(policy_loss)
entropy_loss = kls.binary_crossentropy(logits, logits, from_logits=True) # entropy loss can be calculated via CE over itself
# entropy_loss = kls.categorical_crossentropy(logits, logits, from_logits=True)
# here signs are flipped because optimizer minimizes
return (1/policy_loss - self.params['entropy'] * entropy_loss)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
env = DivineEnvironment("examples/data/merging.json",
ego_agent_id=1,
camera_follow=True,
idm_enabled=True)
network = nw.Network(num_actions=4) # learn beta distribution parameter for both actions [acceleration, steering angle]
agent = A2CAgent(network)
rewards_history = agent.train(env)
for _ in range(0, 5):
print("Total Episode Reward: %d out of 200" % agent.test(env, True))
# print("Finished training.")
#print("Total Episode Reward: %d out of 200" % agent.test(env, True))
И отток в Tensorflow:
class Network(tf.keras.Model):
def __init__(self, num_actions):
# mlp multi layer perceptioin
super().__init__('mlp_policy')
# hidden1 for the Actor model
self.hidden1 = tf.keras.Sequential()
self.hidden1.add(kl.Dense(512, activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.001),
kernel_initializer=tf.keras.initializers.lecun_normal(seed=None)))
self.hidden1.add(kl.GaussianDropout(0.4))
self.hidden1.add(kl.Dense(512, activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.001)))
# hidden2 for the critic model
self.hidden2 = tf.keras.Sequential()
self.hidden2.add(kl.Dense(128, activation='relu', name='hidden2_output'))
self.value = kl.Dense(1,name='value')
# logits are unnormalized log probabilities
self.logits = kl.Dense(num_actions, kernel_regularizer=tf.keras.regularizers.l2(0.002),
kernel_initializer=tf.keras.initializers.lecun_normal(seed=None),
name='policy_logits')
# self.dist = tfp.layers.DistributionLambda(lambda t: tfp.distributions.Beta(concentration1=t[:,0:2], concentration0=t[:,2:]),
#self.dist = tfp.layers.DistributionLambda(lambda t: tfp.distributions.Beta(concentration1=2, concentration0=t[:,0:2]),
# convert_to_tensor_fn=lambda s: s.sample(),
self.dist = tfp.layers.IndependentNormal(2,
name='probability_layer')
def call(self, inputs):
# inputs is a numpy array, convert to Tensor
x = tf.convert_to_tensor(inputs)
# separate hidden layers from the same input tensor
hidden_logs = self.hidden1(x)
hidden_out = self.logits(hidden_logs)
hidden_vals = self.hidden2(x)
return tf.convert_to_tensor(self.dist(hidden_out)), self.value(hidden_vals) ## comment if you want to use logits
def action_value(self, obs):
# executes call() under the hood
action, value = self.predict(obs)
return action, np.squeeze(value, axis=-1)
Я был бы очень рад услышать некоторые советы и / или лучшие практики для моделей обучения с подкреплением, так как я только начал с этой темы.