Я пытаюсь реализовать алгоритм DQN, но, похоже, он не работает.
Я не знаю, что я делаю неправильно.
Я запутался, что он работает раньше 350 эпизодов вокруг, но после этого будет go хуже.
Есть два файла.
Полный код: https://discuss.pytorch.org/t/the-dqn-doest-work-help-me/71999
main.py
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import core
import gym
np.random.seed(0)
torch.manual_seed(1)
class StateEncoder():
def __init__(self, env: gym.Env):
self.state_dim = env.observation_space.shape[0]
def __call__(self, state):
return torch.from_numpy(state).float().view(-1, self.state_dim)
class DQN(nn.Module):
def __init__(self, env, hidden_sizes=[64, 64], activation=nn.Tanh, epsilon=1, gamma=0.9, buffer_size=10000, update_times=1):
super(DQN, self).__init__()
self.env = env # type: gym.Env
self.obs_dim = env.observation_space.shape[0]
self.action_size = env.action_space.n
self.action_dim = 1
self.q = core.MLPQFunction(self.obs_dim, self.action_size, hidden_sizes, activation)
self.q_target = core.MLPQFunction(self.obs_dim, self.action_size, hidden_sizes, activation)
self.epsilon, self.gamma = epsilon, gamma
self.buffer = core.ReplayBuffer(self.obs_dim, self.action_dim, buffer_size=buffer_size)
self.update_times = update_times
def getQValue(self, obs) -> torch.Tensor:
qvalues = self.q(obs)
return qvalues
def getMaxQbyQTarget(self, obs) -> torch.Tensor:
with torch.no_grad():
qv = self.q_target(obs)
return qv.max(1)[0]
def getAction(self, obs) -> int:
if np.random.rand() < self.epsilon:
return self.env.action_space.sample()
else:
qvalues = self.getQValue(obs).squeeze() # type: torch.Tensor
qvalues = qvalues.detach().numpy()
idxs = np.argwhere(qvalues == qvalues.max())
action = np.random.choice(idxs.ravel())
return int(action)
def update(self):
LOSS = nn.MSELoss()
opt = optim.Adam(params=self.q.parameters(), lr=5e-4, betas=(0.9, 0.999), eps=1e-8, weight_decay=0, amsgrad=False)
for i in range(self.update_times):
data = self.buffer.sample_batch(batch_size=64)
obs, obs2, act, rew, done = data['obs'], data['obs2'], data['act'], data['rew'], data['done']
qtarget = rew.view(-1, 1) + (1 - done).view(-1, 1) * self.gamma * self.getMaxQbyQTarget(obs2).view(-1, 1)
qv = self.getQValue(obs).gather(dim=1, index=act.long().view(-1, 1))
loss = LOSS(qv, qtarget) # type:torch.Tensor
opt.zero_grad()
loss.backward()
opt.step()
# self.q_target.load_state_dict(self.q.state_dict())
self.soft_update_Qtarget(self.q, self.q_target)
def soft_update_Qtarget(self, local_model, target_model, tau=0.001):
"""Soft update model parameters.
θ_target = τ*θ_local + (1 - τ)*θ_target
Params
======
local_model (PyTorch model): weights will be copied from
target_model (PyTorch model): weights will be copied to
tau (float): interpolation parameter
"""
for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)
from torch.utils.tensorboard import SummaryWriter
from collections import deque
writer = SummaryWriter('runs/test2')
if __name__ == '__main__':
env = gym.make('LunarLander-v2')
env.seed(0)
episode_length, update_per_steps = 1000, 4
stateEncode = StateEncoder(env)
dqn = DQN(env=env,
hidden_sizes=[64, 64],
activation=nn.ReLU,
epsilon=1.0,
buffer_size=20000,
update_times=1)
steps_idx = 0
scores_window = deque(maxlen=100)
for epch in range(1500):
o = env.reset()
o = stateEncode(o)
gain = 0.0
# env.render(mode='human')
for i in range(episode_length):
a = dqn.getAction(o)
next_o, r, d, info = env.step(a)
# env.render(mode='human')
next_o = stateEncode(next_o)
dqn.buffer.store(o, a, r, next_o, d)
o = next_o
gain += r
steps_idx += 1
if (i + 1) % update_per_steps == 0:
if steps_idx > 64:
dqn.update()
if d == 1:
break
dqn.epsilon = max(dqn.epsilon * 0.995, 0.01)
writer.add_scalar('Gains', gain, epch)
scores_window.append(gain)
print('Episode {}\tAverage Score: {:.2f}, gain:{:.2f} \n'.format(epch, np.mean(scores_window), gain), end="")
core.py:
import numpy as np
import torch
import torch.nn as nn
class ReplayBuffer:
"""
A simple FIFO experience replay buffer.
size : current size
max_size : max size
store() : one by one
sample_batch : get shuffle batch,
type: dict,
keys: obs, obs2,act,rew,done (type:torch.float32)
"""
def __init__(self, obs_dim, act_dim, buffer_size):
size = buffer_size
self.obs_buf = np.zeros(combined_shape(size, obs_dim), dtype=np.float32)
self.obs2_buf = np.zeros(combined_shape(size, obs_dim), dtype=np.float32)
self.act_buf = np.zeros(combined_shape(size, act_dim), dtype=np.float32)
self.rew_buf = np.zeros(size, dtype=np.float32)
self.done_buf = np.zeros(size, dtype=np.float32)
self.ptr, self.size, self.max_size = 0, 0, size
def store(self, obs, act, rew, next_obs, done):
self.obs_buf[self.ptr] = obs
self.obs2_buf[self.ptr] = next_obs
self.act_buf[self.ptr] = act
self.rew_buf[self.ptr] = rew
self.done_buf[self.ptr] = done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample_batch(self, batch_size=64):
idxs = np.random.randint(0, self.size, size=batch_size)
batch = dict(obs=self.obs_buf[idxs],
obs2=self.obs2_buf[idxs],
act=self.act_buf[idxs],
rew=self.rew_buf[idxs],
done=self.done_buf[idxs])
return {k: torch.as_tensor(v, dtype=torch.float32) for k, v in batch.items()}
def combined_shape(length, shape=None):
if shape is None:
return (length,)
return (length, shape) if np.isscalar(shape) else (length, *shape)
def mlp(sizes, activation, output_activation=nn.Identity):
layers = []
for j in range(len(sizes) - 1):
act = activation if j < len(sizes) - 2 else output_activation
layers += [nn.Linear(sizes[j], sizes[j + 1]), act()]
return nn.Sequential(*layers)
class MLPQFunction(nn.Module):
def __init__(self, obs_dim, act_size, hidden_sizes, activation):
super().__init__()
self.q = mlp([obs_dim] + list(hidden_sizes) + [act_size], activation)
def forward(self, obs):
q = self.q(obs)
return q