Я пытаюсь реализовать алгоритм A3 C. Я попытался обучить набор данных для трех процессов, однако на всех трех процессах выходное вознаграждение одинаково на всех трех итерациях. Исходя из этого, я предполагаю, что обновления модели не происходят.
Ниже мой код ... может кто-нибудь, пожалуйста, поможет проверить, есть ли какие-то проблемы с моим кодом. Также моя среда не поддерживает асинхронность. Т.е. она обрабатывает 1 агент за раз, и поэтому 3 среды взаимодействуют с общим агентом. Однако функция вознаграждения во всех трех случаях точно такая же.
Может ли кто-нибудь помочь проверить, если что-то не так с моим кодом.
import os
import torch
import torch.multiprocessing as mp
from model import ActorCritic
import my_optim
from ENV import ENV
import pandas as pd
import atr_bb as ab
import numpy as np
import traceback
import torch.nn.functional as F
from torch.autograd import Variable
import torch
import torch.nn.functional as F
from ENV import ENV
import multiprocessing
os.chdir(os.path.dirname(os.path.abspath(__file__)))
state_dim = 5
action_dim = 4
PATH = "E:\\ML\\Breakout_a3c\\Code_With_Comments\\model.pth"
# Implementing a function to make sure the models share the same gradient
def ensure_shared_grads(model, shared_model):
for param, shared_param in zip(model.parameters(), shared_model.parameters()):
if shared_param.grad is not None:
return
shared_param._grad = param.grad
# Gathering all the parameters (that we can modify to explore)
class Params():
def __init__(self):
self.lr = 0.0001
self.gamma = 0.99
self.tau = 1.
self.seed = 1
self.num_processes = 3
self.num_steps = 20
self.max_episode_length = 10000
self.env_name = 'Breakout-v0'
def train(rank, params, shared_model, optimizer,ticker):
torch.manual_seed(params.seed + rank) # shifting the seed with rank to asynchronize each training agent
# try:
ohlcv = pd.read_csv(ticker + '.csv')
data = ohlcv.copy()
data['rsi'] = ab.RSI(data['Close'],14)
data['adx'] = ab.ADX(data,20)
data = ab.BollBnd(data,20)
data['BBr'] = data['Close']/data['BB_dn']
data['macd_signal'] = ab.MACD(data)
data['macd_r'] = data['macd_signal']/data['Close']
data['ema20'] = ab.EMA(np.asarray(data['Close']), 20)
data['ema20_r'] = data['ema20']/data['Close']
data['Close'] = data['Close']/data['Close'].max()
data = data.iloc[:,[4,7,8,13,15,17]]
data = data.dropna()
data = torch.DoubleTensor(np.asarray(data))
env = ENV(state_dim, action_dim, data)
print("env created shape:",env.num_steps)
model = ActorCritic(env.observation_space, env.action_space)
state = env.reset()
done = True
episode_length = 0
try:
for idx in range(1):
episode_length += 1
model.load_state_dict(shared_model.state_dict())
if done:
cx = Variable(torch.zeros(1, state_dim)) # the cell states of the LSTM are reinitialized to zero
hx = Variable(torch.zeros(1, state_dim)) # the hidden states of the LSTM are reinitialized to zero
else:
cx = Variable(cx.data)
hx = Variable(hx.data)
values = []
log_probs = []
rewards = []
entropies = []
for step in range(env.num_steps-2):
value, action_values, (hx, cx) = model((Variable(state.unsqueeze(0)), (hx, cx)))
prob = F.softmax(action_values,-1)
log_prob = F.log_softmax(action_values,-1)
entropy = -(log_prob * prob).sum(1)
entropies.append(entropy)
action = prob.multinomial(num_samples = 1).data
log_prob = log_prob.gather(1, Variable(action))
values.append(value)
log_probs.append(log_prob)
state, reward, done = env.step(action)
print(ticker," reward ",reward)
done = (done or episode_length >= env.num_steps)
reward = max(min(reward, 1), -1) # clamping the reward between -1 and +1
if done:
episode_length = 0
state = env.reset()
rewards.append(reward)
if done:
break
R = torch.zeros(1, 1)
if not done: # if we are not done:
value, _, _ = model((Variable(state.unsqueeze(0)), (hx, cx)))
R = value.data
values.append(Variable(R))
policy_loss = torch.zeros(1, 1)
value_loss = torch.zeros(1, 1)
R = Variable(R)
gae = torch.zeros(1, 1)
for i in reversed(range(len(rewards))):
R = params.gamma * R + rewards[i]
advantage = R - values[i]
value_loss = value_loss + 0.5 * advantage.pow(2) # computing the value loss
TD = rewards[i] + params.gamma * values[i + 1].data - values[i].data # computing the temporal difference
gae = gae * params.gamma * params.tau + TD # gae = sum_i (gamma*tau)^i * TD(i) with gae_i = gae_(i+1)*gamma*tau + (r_i + gamma*V(state_i+1) - V(state_i))
policy_loss = policy_loss - log_probs[i] * Variable(gae) - 0.01 * entropies[i] # computing the policy loss
optimizer.zero_grad() # initializing the optimizer
los = policy_loss + 0.5 * value_loss
(policy_loss + 0.5 * value_loss).backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 40) # clamping the values
ensure_shared_grads(model, shared_model)
optimizer.step() # running the optimization
torch.save(model.state_dict(), PATH)
except Exception as e:
print(e)
traceback.print_exc()
def worker(i):
"""worker function"""
print('Worker:',i)
# sys.stdout.flush()
return
params = Params()
shared_model = ActorCritic(state_dim, action_dim) # shared_model is the model shared by the different agents (different threads in different cores)
shared_model.share_memory() # storing the model in the shared memory of the computer, which allows the threads to have access to this shared memory even if they are in different cores
optimizer = my_optim.SharedAdam(shared_model.parameters(), lr=params.lr) # the optimizer is also shared because it acts on the shared model
optimizer.share_memory()
tickers = pd.read_csv("tickers.csv")
indices = tickers['Symbol']
jobs = []
for ticker in indices:
try:
processes = [] # initializing the processes with an empty list
for rank in range(0, params.num_processes): # making a loop to run all the other processes that will be trained by updating the shared model
# if __name__ == '__main__':
# p = mp.Process(target=train, args=(rank, params, shared_model, optimizer,ticker))
# p.start()
# processes.append(p)
if __name__ == '__main__':
p = multiprocessing.Process(target=train, args=(rank, params, shared_model, optimizer,ticker))
jobs.append(p)
p.start()
for p in jobs: # creating a pointer that will allow to kill all the threads when at least one of the threads, or main.py will be killed, allowing to stop the program safely
p.join()
except Exception as e:
print(e)
traceback.print_exc()
# if __name__ == '__main__':
# jobs = []
# for i in range(5):
# p = multiprocessing.Process(target=worker,args=(i,))
# jobs.append(p)
# p.start()