Многопроцессорная обработка функции из основного разветвления, а также в python - PullRequest
0 голосов
/ 06 августа 2020

Я пытаюсь 5 раз выполнить многопроцессорную обработку функции, используя python multiprocessing.Process library

К моему удивлению, я также получаю отпечатки, которые я добавил в 5 раз больше ... моя интуиция такова. этот main также вызывается 5 раз.

Я добавил отпечатки в функцию поезда, которые мне нужно обработать 5 раз, я вижу, что многопроцессорность происходит из отпечатков, но я не могу понять, почему main также вам звонят 5 раз.

Вот мой код .. Может кто-нибудь, пожалуйста, помогите проверить, что не так с этим кодом.

def train(rank,params, shared_model, optimizer,ticker):
    try:
        print("rank:",str(rank)," ",str(ticker),"\n")
        f.write("rank:"+str(rank)+" "+str(ticker)+"\n")
        data= pd.read_csv(ticker + '.csv')
        data = data.dropna()
        count = 0
        max_timesteps = int(data.shape[0]*0.8)
        
        data = torch.DoubleTensor(np.asarray(data))

        env = ENV(state_dim, action_dim, data)
    
        # init training variables
    
        state = env.reset()
        done = True
        episode_length = 0
        count = 0
        while count<max_timesteps-1:
            episode_length += 1
            if done:
                cx = Variable(torch.zeros(1, state_dim))
                hx = Variable(torch.zeros(1, state_dim))
            else:
                cx = Variable(cx.data)
                hx = Variable(hx.data)
            values = []
            log_probs = []
            rewards = []
            entropies = []
            for step in range(max_timesteps):
                value, action_values, (hx, cx) = model((Variable(state.unsqueeze(0)), (hx, cx)))
                prob = F.softmax(action_values,dim = 0)
                log_prob = F.log_softmax(action_values,dim = 0)
                entropy = -(log_prob * prob).sum(1)
                entropies.append(entropy)
                action = prob.multinomial(num_samples=1).data
                log_prob = log_prob.gather(1, Variable(action))
                values.append(value)
                log_probs.append(log_prob)
                state, reward, done = env.step(action.numpy())
                print(ticker," reward", reward , " rank:",rank)
                count+=1
                done = (done or count == max_timesteps-2)
                reward = max(min(reward, 1), -1)
                if done:
                    episode_length = 0
                    state = env.reset()
                
                rewards.append(reward)
                if done:
                    break
            R = torch.zeros(1, 1)
            if not done:
                value, _, _ = model((Variable(state.unsqueeze(0)), (hx, cx)))
                R = value.data
            values.append(Variable(R))
            policy_loss = 0
            value_loss = 0
            R = Variable(R)
            gae = torch.zeros(1, 1)
            for i in reversed(range(len(rewards))):
                R = params.gamma * R + rewards[i]
                advantage = R - values[i]
                value_loss = value_loss + 0.5 * advantage.pow(2)
                TD = rewards[i] + params.gamma * values[i + 1].data - values[i].data
                gae = gae * params.gamma * params.tau + TD
                policy_loss = policy_loss - log_probs[i] * Variable(gae) - 0.01 * entropies[i]
                
            optimizer.zero_grad()
            (policy_loss + 0.5 * value_loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 40)
            optimizer.step()
            f.flush()

    except:
        print(ticker)
        traceback.print_exc()
        var = traceback.format_exc()
        f.write(str(ticker)+"\n")
        f.write("exception:\n"+str(var))
        f.flush()


# Implementing the Adam optimizer with shared states


class Params():
    def __init__(self):
        self.lr = 0.001
        self.gamma = 0.99
        self.tau = 1.
        self.seed = 1
        self.num_processes = 5
        self.num_steps = 20
        self.max_episode_length = 10000
        self.env_name = 'Breakout-v0'


params = Params()

state_dim = 6
action_dim = 3

model = ActorCritic(state_dim, action_dim)
model.share_memory()
optimizer = SharedAdam(model.parameters(), lr=params.lr)
optimizer.share_memory()


# set the parameters

epochs = 1
state_dim = 6
action_dim = 3
max_action = 1
idx = 0
file_name = "%s" % ("computational__reward")
directory="./pytorch_models"

# instantiate policy

tickers = pd.read_csv("tickers.csv")
indices = tickers['Symbol']
jobs = []

#indices = ['FTK']
for ep in range(1):
    print("epoch:",ep)
    f.write("epoch:"+str(ep)+"\n")
    for ticker in indices:
        try:
            for rank in range(0, params.num_processes): # making a loop to run all the other processes that will be trained by updating the shared model
                if __name__ == '__main__':
                    p = multiprocessing.Process(target=train, args=(rank, params, model, optimizer,ticker,))
                    jobs.append(p)
                    p.start()
            for p in jobs: # creating a pointer that will allow to kill all the threads when at least one of the threads, or main.py will be killed, allowing to stop the program safely
                p.join()
                

            model.save("A3C_multi"+ str(ep)+"_" + file_name, directory="./pytorch_models")
            f.flush()
        except Exception as e:
            print(e)
            traceback.print_exc()
            var = traceback.format_exc()
            f.write(str(ticker)+"\n")
            f.write("exception:\n"+str(var))
model.save("A3C"+ str(ep)+"_" + file_name, directory="./pytorch_models")

Я получаю 6 раз это сообщение:

epoch: 0
epoch: 0
epoch: 0
epoch: 0
epoch: 0
epoch: 0

1 Ответ

0 голосов
/ 06 августа 2020

Короткий ответ

Поскольку вы запускаете это на windows, весь код , который не является функцией или определениями классов, должен находиться внутри блока if __name__ is "__main__" .

Длинный ответ

В операционных системах POSIX модуль multiprocessing реализуется с помощью системного вызова fork(), который создает копию процесса. Это очень удобно, потому что второй процесс полностью инициализируется из коробки.

Microsoft windows не имеет этого системного вызова. Итак, Python пытается имитировать это, запустив новый интерпретатор Python и импортировав вашу программу как модуль. Чтобы это работало, импорт вашей программы не должен иметь побочных эффектов . Лучший способ добиться этого - поместить все, что не является определением класса или функции, внутрь блока if __name__ is "__main__".

Поскольку часть вашего кода находится за пределами основного блока, он будет выполняться, когда ваша программа импортированы во вновь созданные Python процессы. Вот почему вы видите несколько отпечатков "эпох".

...