Я использую a2c для работы на Cartpole-v1.Конечная цель - найти среднюю награду.Тем не менее, мое вознаграждение меньше 10. Я думал, удалит ли мой код часть вознаграждений или что-то в процессе обучения?Я не знаю, что с этим не так.Настройка кода выглядит нормально.Предполагается, что весь процесс будет иметь вознаграждение около 300 ~ 500.
import tensorflow as tf
import numpy as np
import gym
# Supresses compilation warnings
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
# Constants
learning_rate = 0.001
#trials = 3
episodes = 1000
moves = 999
discount = 0.99
hidden_size = 32
critic_size = 128
updates = 50
avgRs = []
totRs = []
# Helper function to generate distribution
def generate_disRs(hist):
dist = []
last_reward = 0
for element in reversed(hist):
reward = discount * last_reward + element
dist.append(reward)
last_reward = reward
return list(reversed(dist))
class A2C:
def __init__(self):
self.game = gym.make('CartPole-v1')
self.game.reset()
self.num_actions = self.game.action_space.n
self.state_size = self.game.observation_space.shape[0]
self.state_input = tf.placeholder(tf.float32, [None,
self.state_size])
self.rewards = tf.placeholder(shape=[None], dtype=tf.float32)
# Define any additional placeholders needed for training your agent here:
self.state_value = self.critic()
self.actProbs = self.actor()
self.loss_val = self.loss()
self.train_op = self.optimizer()
self.session = tf.Session()
self.session.run(tf.global_variables_initializer())
def optimizer(self):
"""
:return: Optimizer for your loss function
"""
optimizer = tf.train.AdamOptimizer(learning_rate)
trainOp = optimizer.minimize(self.loss)
return trainOp
def critic(self):
"""
Calculates the estimated value for every state in
self.state_input. The critic should not depend on
any other tensors besides self.state_input.
:return: A tensor of shape [num_states] representing the
estimated value of each state in the trajectory.
"""
# Placeholders for critic loss
V1 = tf.Variable(tf.random_normal([4, critic_size], dtype=tf.float32, stddev=.1))
v1Out = tf.nn.relu(tf.matmul(self.state_input, V1))
V2 = tf.Variable(tf.random_normal([critic_size, 1], dtype=tf.float32, stddev=.1))
self.v2Out = tf.matmul(v1Out, V2)
return self.v2Out
def actor(self):
"""
Calculates the action probabilities for every state in self.state_input. The actor should not depend on
any other tensors besides self.state_input.
:return: A tensor of shape [num_states, num_actions] representing the probability distribution
over actions that is generated by your actor.
"""
# Layer 1
# self.state = tf.placeholder(shape=[None, 4], dtype=tf.float32)
self.W = tf.Variable(tf.random_uniform([4, hidden_size], dtype=tf.float32))
self.bias = tf.Variable(tf.random_uniform([hidden_size], dtype=tf.float32))
self.hidden = tf.nn.relu(tf.matmul(self.state_input, self.W) + self.bias)
# Layer 2
self.O = tf.Variable(tf.random_uniform([hidden_size, 2], dtype=tf.float32))
self.bias2 = tf.Variable(tf.random_uniform([2], dtype=tf.float32))
self.output = tf.nn.softmax(tf.matmul(self.hidden, self.O) + self.bias2)
self.actions = tf.placeholder(shape=[None], dtype=tf.int32)
self.indices = tf.range(0, tf.shape(self.output)[0]) * 2 + self.actions
self.actProbs = tf.gather(tf.reshape(self.output, [-1]), self.indices)
#self.aloss = -tf.reduce_mean(tf.log(self.actProbs) * self.advantage)
return self.actProbs
#return self.aloss
def loss(self):
"""
:return: A scalar tensor representing the combined actor and critic loss.
"""
# Placeholders
# self.rewards = tf.placeholder(shape=[None], dtype=tf.float32)
# self.actions = tf.placeholder(shape=[None], dtype=tf.int32)
# self.indices = tf.range(0, tf.shape(self.output)[0]) * 2 + self.actions
# self.actProbs = tf.gather(tf.reshape(self.output, [-1]), self.indices)
self.aloss = -tf.reduce_mean(tf.log(self.actProbs) * self.rewards)
self.cLoss = tf.reduce_mean(tf.square(self.rewards - self.v2Out))
self.loss = self.aloss + self.cLoss
return self.loss
def train_episode(self):
"""
train_episode will be called 1000 times by the autograder to train your agent. In this method,
run your agent for a single episode, then use that data to train your agent. Feel free to
add any return values to this method.
"""
# reset
st = self.game.reset()
# List to store state, action and reward histories
state_hist = []
action_hist = []
reward_hist = []
# List to store history of states and values
state_value_hist = []
for move in range(moves):
# Run
actDict, stateVal = self.session.run([self.output, self.v2Out], feed_dict={self.state_input: [st]})
# Get the random action
action = np.random.choice(np.array([0, 1]), p=actDict[0])
st1, reward, done, info = self.game.step(action)
# Render the game
# game.render()
# Add to the history
action_hist.append(action)
reward_hist.append(reward)
state_hist.append(st)
state_value_hist.append(stateVal[0][0])
# Iterate
st = st1
# Update
if done or (move % updates == 0 and move != 0):
# Get disRs
disRs = generate_disRs(reward_hist)
# Compute Difference
difference = np.array(disRs) - np.array(state_value_hist)
# Run
feed_dict = {self.state_input: state_hist, self.actions: action_hist, self.rewards: difference}
l, _ = self.session.run([self.loss_val, self.train_op], feed_dict=feed_dict)
if done:
totRs.append(move)
# print move, disRs[0]
break
def check_actor(model):
"""
The autograder will use your actor() function to test your agent. This function
checks that your actor returns a tensor of the right shape for the autograder.
:return: True if the model's actor returns a tensor of the correct shape.
"""
dummy_state = np.ones((10, 4))
#actDict = model.session.run(model.output, feed_dict={model.state_input: [model.game.reset()]})
actor_probs = model.session.run(model.actProbs, feed_dict={
model.state_input: dummy_state)
})
return actor_probs.shape == (10, 2)
if __name__ == '__main__':
# Change __main__ to train your agent for 1000 episodes and print the average reward over the last 100 episodes.
# The code below is similar to what our autograder will be running.
learner = A2C()
for i in range(1000):
learner.train_episode()
print(str(np.average(totRs[900: 1000])))