Привет, я пишу, потому что по какой-то причине моя реализация дуэльного двойного глубокого обучения постепенно ухудшается, а не лучше, заканчивается на -800 после 1000 эпизодов. Я основал код на основе Franks World:
Извините, очень раздражает, но я не могу заставить классы в моем коде (ниже) отображаться как один длинный код ...
'' '
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Add, Activation
import tensorflow.keras as keras
from tensorflow.keras.optimizers import Adam
from keras.models import Sequential, load_model
import gym
import numpy as np
import random
class ReplayBuffer():
def __init__(self, capacity, input_shape, n_actions, discrete=False):
self.capacity = capacity
self.mem_cntr = 0
self.discrete = discrete
self.state_memory = np.zeros((self.capacity, input_shape))
self.new_state_memory = np.zeros((self.capacity, input_shape))
self.action_memory = np.zeros((self.capacity, n_actions), dtype=np.int32)
self.reward_memory = np.zeros(self.capacity, dtype=np.float32)
self.terminal_memory = np.zeros(self.capacity, dtype=np.float32)
def store_transition(self, state, action, reward, new_state, done):
index = self.mem_cntr % self.capacity
self.state_memory[index] = state
self.new_state_memory[index] = new_state
if self.discrete:
actions = np.zeros(self.action_memory.shape[1])
actions[action] = 1.0
self.action_memory[index] = actions
else:
self.action_memory[index] = action
self.reward_memory[index] = reward
self.terminal_memory[index] = 1 - done
self.mem_cntr += 1
def sample_buffer(self, batch_size):
max_mem = min(self.mem_cntr, self.capacity)
batch = np.random.choice(max_mem, batch_size, replace=False)
states = self.state_memory[batch]
new_states = self.new_state_memory[batch]
actions = self.action_memory[batch]
rewards = self.reward_memory[batch]
terminals = self.terminal_memory[batch]
return states, actions, rewards, new_states, terminals
class Agent():
def __init__(self, lr, gamma, n_actions, epsilon, batch_size,
input_dims, epsilon_dec=0.996, epsilon_end=0.01,
capacity=100000, fname='dueling_dqn.h5', fc1_dims=256,
fc2_dims=256, replace=100):
self.action_space = [i for i in range(n_actions)]
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_dec = epsilon_dec
self.epsilon_min = epsilon_end
self.fname = fname
self.replace = replace
self.batch_size = batch_size
self.lr = lr
self.learn_step_counter = 0
self.memory = ReplayBuffer(capacity, input_dims, n_actions, discrete=True)
self.q_eval = D3QN(input_dims, n_actions, fc1_dims, fc2_dims, lr)
self.q_next = D3QN(input_dims, n_actions, fc1_dims, fc2_dims, lr)
self.q_eval.compile(optimizer=Adam(learning_rate=lr),
loss='mean_squared_error')
# just a formality, won't optimize network
self.q_next.compile(optimizer=Adam(learning_rate=lr),
loss='mean_squared_error')
def store_transition(self, state, action, reward, new_state, done):
self.memory.store_transition(state, action, reward, new_state, done)
def choose_action(self, state):
if np.random.random() < self.epsilon:
action = np.random.choice(self.action_space)
else:
state = np.array([state])
actions = self.q_eval.advantage(state)
action = tf.math.argmax(actions, axis=1).numpy()[0]
return action
def learn(self):
if self.memory.mem_cntr < self.batch_size:
return
if self.learn_step_counter % self.replace == 0:
self.q_next.set_weights(self.q_eval.get_weights())
states, actions, rewards, states_, terminals = \
self.memory.sample_buffer(self.batch_size)
q_pred = self.q_eval(states)
q_next = tf.math.reduce_max(self.q_next(states_), axis=1, keepdims=True).numpy()
q_target = np.copy(q_pred)
# improve on my solution!
for idx, terminal in enumerate(terminals):
if terminal:
q_next[idx] = 0.0
q_target[idx, actions[idx]] = rewards[idx] + self.gamma*q_next[idx]
self.q_eval.train_on_batch(states, q_target)
self.epsilon = self.epsilon - self.epsilon_dec if self.epsilon > \
self.epsilon_min else self.epsilon_min
self.learn_step_counter += 1
def update_network_parameters(self):
self.q_target.model.set_weights(self.q_eval.model.get_weights())
self.learn_step_counter += 1
def save_model(self):
self.q_eval.save(self.model_file)
def load_model(self):
self.q_eval = load_model(self.model_file)
if self.epsilon == 0.0:
self.update_network_parameters()
def plot_learning_curve(x, scores, epsilons, filename, lines=None):
fig=plt.figure()
ax=fig.add_subplot(111, label="1")
ax2=fig.add_subplot(111, label="2", frame_on=False)
ax.plot(x, epsilons, color="C0")
ax.set_xlabel("Training Steps", color="C0")
ax.set_ylabel("Epsilon", color="C0")
ax.tick_params(axis='x', colors="C0")
ax.tick_params(axis='y', colors="C0")
N = len(scores)
running_avg = np.empty(N)
for t in range(N):
running_avg[t] = np.mean(scores[max(0, t-20):(t+1)])
ax2.scatter(x, running_avg, color="C1")
ax2.axes.get_xaxis().set_visible(False)
ax2.yaxis.tick_right()
ax2.set_ylabel('Score', color="C1")
ax2.yaxis.set_label_position('right')
ax2.tick_params(axis='y', colors="C1")
if lines is not None:
for line in lines:
plt.axvline(x=line)
plt.savefig(filename)
if __name__ == '__main__':
env = gym.make('LunarLander-v2')
n_games = 400
agent = Agent(gamma=0.99, epsilon=1, lr=0.0005, input_dims=8,
epsilon_dec=1e-3, capacity=100000, batch_size=64, epsilon_end=0.01,
fc1_dims=128, fc2_dims=128, replace=100, n_actions=4)
scores, eps_history = [], []
for i in range(n_games):
done = False
score = 0
state = env.reset()
while not done:
action = agent.choose_action(state)
next_state, reward, done, info = env.step(action)
score += reward #unfair
agent.store_transition(state, action, reward, next_state, int(done))
state = next_state
agent.learn()
eps_history.append(agent.epsilon)
scores.append(score)
avg_score = np.mean(scores[-100:])
print('episode ', i, 'score %.1f' % score,
'average score %.1f' % avg_score,
'epsilon %.2f' % agent.epsilon)
filename='keras_lunar_lander.png'
x = [i+1 for i in range(n_games)]
plot_learning_curve(x, scores, eps_history, filename)
'' '