Я пытаюсь реализовать алгоритм A3 C. Я попытался обучить набор данных для трех процессов, однако на всех трех процессах выходное вознаграждение одинаково на всех трех итерациях. Исходя из этого, я предполагаю, что обновления модели не происходят.
Ниже мой код ... может кто-нибудь, пожалуйста, поможет проверить, есть ли какие-то проблемы с моим кодом. Также моя среда не поддерживает асинхронность. Т.е. она обрабатывает 1 агент за раз, и поэтому 3 среды взаимодействуют с общим агентом. Однако функция вознаграждения во всех трех случаях точно такая же.
Может ли кто-нибудь помочь проверить, если что-то не так с моим кодом.
import os
import torch
import torch.multiprocessing as mp
from model import ActorCritic
import my_optim
from ENV import ENV
import pandas as pd
import atr_bb as ab
import numpy as np
import traceback
import torch.nn.functional as F
from torch.autograd import Variable
import torch
import torch.nn.functional as F
from ENV import ENV
import multiprocessing
state_dim = 5
action_dim = 4
PATH = "E:\\ML\\Breakout_a3c\\Code_With_Comments\\model.pth"
# Implementing a function to make sure the models share the same gradient
def ensure_shared_grads(model, shared_model):
for param, shared_param in zip(model.parameters(), shared_model.parameters()):
if shared_param.grad is not None:
shared_param._grad = param.grad
# Gathering all the parameters (that we can modify to explore)
class Params():
def __init__(self):
self.lr = 0.0001
self.gamma = 0.99
self.tau = 1.
self.seed = 1
self.num_processes = 3
self.num_steps = 20
self.max_episode_length = 10000
self.env_name = 'Breakout-v0'
def train(rank, params, shared_model, optimizer,ticker):
torch.manual_seed(params.seed + rank) # shifting the seed with rank to asynchronize each training agent
# try:
ohlcv = pd.read_csv(ticker + '.csv')
data = ohlcv.copy()
data['rsi'] = ab.RSI(data['Close'],14)
data['adx'] = ab.ADX(data,20)
data = ab.BollBnd(data,20)
data['BBr'] = data['Close']/data['BB_dn']
data['macd_signal'] = ab.MACD(data)
data['macd_r'] = data['macd_signal']/data['Close']
data['ema20'] = ab.EMA(np.asarray(data['Close']), 20)
data['ema20_r'] = data['ema20']/data['Close']
data['Close'] = data['Close']/data['Close'].max()
data = data.iloc[:,[4,7,8,13,15,17]]
data = data.dropna()
data = torch.DoubleTensor(np.asarray(data))
env = ENV(state_dim, action_dim, data)
print("env created shape:",env.num_steps)
model = ActorCritic(env.observation_space, env.action_space)
state = env.reset()
done = True
episode_length = 0
for idx in range(1):
episode_length += 1
if done:
cx = Variable(torch.zeros(1, state_dim)) # the cell states of the LSTM are reinitialized to zero
hx = Variable(torch.zeros(1, state_dim)) # the hidden states of the LSTM are reinitialized to zero
cx = Variable(cx.data)
hx = Variable(hx.data)
values = []
log_probs = []
rewards = []
entropies = []
for step in range(env.num_steps-2):
value, action_values, (hx, cx) = model((Variable(state.unsqueeze(0)), (hx, cx)))
prob = F.softmax(action_values,-1)
log_prob = F.log_softmax(action_values,-1)
entropy = -(log_prob * prob).sum(1)
action = prob.multinomial(num_samples = 1).data
log_prob = log_prob.gather(1, Variable(action))
state, reward, done = env.step(action)
print(ticker," reward ",reward)
done = (done or episode_length >= env.num_steps)
reward = max(min(reward, 1), -1) # clamping the reward between -1 and +1
if done:
episode_length = 0
state = env.reset()
if done:
R = torch.zeros(1, 1)
if not done: # if we are not done:
value, _, _ = model((Variable(state.unsqueeze(0)), (hx, cx)))
R = value.data
policy_loss = torch.zeros(1, 1)
value_loss = torch.zeros(1, 1)
R = Variable(R)
gae = torch.zeros(1, 1)
for i in reversed(range(len(rewards))):
R = params.gamma * R + rewards[i]
advantage = R - values[i]
value_loss = value_loss + 0.5 * advantage.pow(2) # computing the value loss
TD = rewards[i] + params.gamma * values[i + 1].data - values[i].data # computing the temporal difference
gae = gae * params.gamma * params.tau + TD # gae = sum_i (gamma*tau)^i * TD(i) with gae_i = gae_(i+1)*gamma*tau + (r_i + gamma*V(state_i+1) - V(state_i))
policy_loss = policy_loss - log_probs[i] * Variable(gae) - 0.01 * entropies[i] # computing the policy loss
optimizer.zero_grad() # initializing the optimizer
los = policy_loss + 0.5 * value_loss
(policy_loss + 0.5 * value_loss).backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 40) # clamping the values
ensure_shared_grads(model, shared_model)
optimizer.step() # running the optimization
torch.save(model.state_dict(), PATH)
except Exception as e:
def worker(i):
"""worker function"""
# sys.stdout.flush()
params = Params()
shared_model = ActorCritic(state_dim, action_dim) # shared_model is the model shared by the different agents (different threads in different cores)
shared_model.share_memory() # storing the model in the shared memory of the computer, which allows the threads to have access to this shared memory even if they are in different cores
optimizer = my_optim.SharedAdam(shared_model.parameters(), lr=params.lr) # the optimizer is also shared because it acts on the shared model
tickers = pd.read_csv("tickers.csv")
indices = tickers['Symbol']
jobs = []
for ticker in indices:
processes = [] # initializing the processes with an empty list
for rank in range(0, params.num_processes): # making a loop to run all the other processes that will be trained by updating the shared model
# if __name__ == '__main__':
# p = mp.Process(target=train, args=(rank, params, shared_model, optimizer,ticker))
# p.start()
# processes.append(p)
if __name__ == '__main__':
p = multiprocessing.Process(target=train, args=(rank, params, shared_model, optimizer,ticker))
for p in jobs: # creating a pointer that will allow to kill all the threads when at least one of the threads, or main.py will be killed, allowing to stop the program safely
except Exception as e:
# if __name__ == '__main__':
# jobs = []
# for i in range(5):
# p = multiprocessing.Process(target=worker,args=(i,))
# jobs.append(p)
# p.start()