Вес не обновляется с помощью A3C - PullRequest
0 голосов
/ 03 августа 2020

Я пытаюсь реализовать алгоритм A3 C. Я попытался обучить набор данных для трех процессов, однако на всех трех процессах выходное вознаграждение одинаково на всех трех итерациях. Исходя из этого, я предполагаю, что обновления модели не происходят.

Ниже мой код ... может кто-нибудь, пожалуйста, поможет проверить, есть ли какие-то проблемы с моим кодом. Также моя среда не поддерживает асинхронность. Т.е. она обрабатывает 1 агент за раз, и поэтому 3 среды взаимодействуют с общим агентом. Однако функция вознаграждения во всех трех случаях точно такая же.

Может ли кто-нибудь помочь проверить, если что-то не так с моим кодом.

import os
import torch
import torch.multiprocessing as mp
from model import ActorCritic
import my_optim
from ENV import ENV
import pandas as pd
import atr_bb as ab
import numpy as np
import traceback
import torch.nn.functional as F
from torch.autograd import Variable
import torch
import torch.nn.functional as F
from ENV import ENV
import multiprocessing


os.chdir(os.path.dirname(os.path.abspath(__file__)))
state_dim = 5
action_dim = 4


PATH = "E:\\ML\\Breakout_a3c\\Code_With_Comments\\model.pth"
# Implementing a function to make sure the models share the same gradient
def ensure_shared_grads(model, shared_model):
    for param, shared_param in zip(model.parameters(), shared_model.parameters()):
        if shared_param.grad is not None:
            return
        shared_param._grad = param.grad
        
# Gathering all the parameters (that we can modify to explore)
class Params():
    def __init__(self):
        self.lr = 0.0001
        self.gamma = 0.99
        self.tau = 1.
        self.seed = 1
        self.num_processes = 3
        self.num_steps = 20
        self.max_episode_length = 10000
        self.env_name = 'Breakout-v0'
        
def train(rank, params, shared_model, optimizer,ticker):
    torch.manual_seed(params.seed + rank) # shifting the seed with rank to asynchronize each training agent
    # try:
    ohlcv = pd.read_csv(ticker + '.csv')
    data = ohlcv.copy()
    data['rsi'] = ab.RSI(data['Close'],14)
    data['adx'] = ab.ADX(data,20)
    data = ab.BollBnd(data,20)
    data['BBr'] = data['Close']/data['BB_dn']
    data['macd_signal'] = ab.MACD(data)
    data['macd_r'] = data['macd_signal']/data['Close']
    data['ema20'] = ab.EMA(np.asarray(data['Close']), 20)
    data['ema20_r'] = data['ema20']/data['Close']
    data['Close'] = data['Close']/data['Close'].max()
    data = data.iloc[:,[4,7,8,13,15,17]]
    data = data.dropna()
    data = torch.DoubleTensor(np.asarray(data))
    env = ENV(state_dim, action_dim, data)
    print("env created shape:",env.num_steps)
    model = ActorCritic(env.observation_space, env.action_space)
    state = env.reset()
    done = True
    episode_length = 0
    try:
        for idx in range(1):
            
            episode_length += 1
            model.load_state_dict(shared_model.state_dict())
            if done:
                cx = Variable(torch.zeros(1, state_dim)) # the cell states of the LSTM are reinitialized to zero
                hx = Variable(torch.zeros(1, state_dim)) # the hidden states of the LSTM are reinitialized to zero
            else:
                cx = Variable(cx.data)
                hx = Variable(hx.data)
            values = []
            log_probs = []
            rewards = []
            entropies = []
            for step in range(env.num_steps-2):
                value, action_values, (hx, cx) = model((Variable(state.unsqueeze(0)), (hx, cx)))
                prob = F.softmax(action_values,-1)
                log_prob = F.log_softmax(action_values,-1)
                entropy = -(log_prob * prob).sum(1)
                entropies.append(entropy)
                action = prob.multinomial(num_samples = 1).data
                log_prob = log_prob.gather(1, Variable(action))
                values.append(value)
                log_probs.append(log_prob)
                
                state, reward, done = env.step(action)
                print(ticker," reward ",reward)
                done = (done or episode_length >= env.num_steps)
                reward = max(min(reward, 1), -1) # clamping the reward between -1 and +1
                if done:
                    episode_length = 0
                    state = env.reset()
                rewards.append(reward)
                if done:
                    break
            R = torch.zeros(1, 1)
            if not done: # if we are not done:
                value, _, _ = model((Variable(state.unsqueeze(0)), (hx, cx)))
                R = value.data
            values.append(Variable(R))
            policy_loss = torch.zeros(1, 1)
            value_loss = torch.zeros(1, 1) 
            R = Variable(R)
            gae = torch.zeros(1, 1)
    
            for i in reversed(range(len(rewards))):
                R = params.gamma * R + rewards[i]
                advantage = R - values[i]
                value_loss = value_loss + 0.5 * advantage.pow(2) # computing the value loss
                TD = rewards[i] + params.gamma * values[i + 1].data - values[i].data # computing the temporal difference
                gae = gae * params.gamma * params.tau + TD # gae = sum_i (gamma*tau)^i * TD(i) with gae_i = gae_(i+1)*gamma*tau + (r_i + gamma*V(state_i+1) - V(state_i))
                policy_loss = policy_loss - log_probs[i] * Variable(gae) - 0.01 * entropies[i] # computing the policy loss            
            optimizer.zero_grad() # initializing the optimizer
            los = policy_loss + 0.5 * value_loss
            (policy_loss + 0.5 * value_loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 40) # clamping the values 
            ensure_shared_grads(model, shared_model) 
            optimizer.step() # running the optimization 
        torch.save(model.state_dict(), PATH)
    except Exception as e:
        print(e)
        traceback.print_exc()


def worker(i):
    """worker function"""
    print('Worker:',i)
    # sys.stdout.flush()
    return

params = Params()
shared_model = ActorCritic(state_dim, action_dim) # shared_model is the model shared by the different agents (different threads in different cores)
shared_model.share_memory() # storing the model in the shared memory of the computer, which allows the threads to have access to this shared memory even if they are in different cores
optimizer = my_optim.SharedAdam(shared_model.parameters(), lr=params.lr) # the optimizer is also shared because it acts on the shared model
optimizer.share_memory()

tickers = pd.read_csv("tickers.csv")
indices = tickers['Symbol']
jobs = []
for ticker in indices:
    
    try:
        
        processes = [] # initializing the processes with an empty list
        for rank in range(0, params.num_processes): # making a loop to run all the other processes that will be trained by updating the shared model
            # if __name__ == '__main__':    
            #     p = mp.Process(target=train, args=(rank, params, shared_model, optimizer,ticker))
            #     p.start()
            #     processes.append(p)
            
            if __name__ == '__main__':
                p = multiprocessing.Process(target=train, args=(rank, params, shared_model, optimizer,ticker))
                jobs.append(p)
                p.start()
                for p in jobs: # creating a pointer that will allow to kill all the threads when at least one of the threads, or main.py will be killed, allowing to stop the program safely
                    p.join()
            
    except Exception as e:
        print(e)
        traceback.print_exc()


# if __name__ == '__main__':
#     jobs = []
#     for i in range(5):
#         p = multiprocessing.Process(target=worker,args=(i,))
#         jobs.append(p)
#         p.start()
...