Я пытаюсь 5 раз выполнить многопроцессорную обработку функции, используя python multiprocessing.Process library
К моему удивлению, я также получаю отпечатки, которые я добавил в 5 раз больше ... моя интуиция такова. этот main также вызывается 5 раз.
Я добавил отпечатки в функцию поезда, которые мне нужно обработать 5 раз, я вижу, что многопроцессорность происходит из отпечатков, но я не могу понять, почему main также вам звонят 5 раз.
Вот мой код .. Может кто-нибудь, пожалуйста, помогите проверить, что не так с этим кодом.
def train(rank,params, shared_model, optimizer,ticker):
try:
print("rank:",str(rank)," ",str(ticker),"\n")
f.write("rank:"+str(rank)+" "+str(ticker)+"\n")
data= pd.read_csv(ticker + '.csv')
data = data.dropna()
count = 0
max_timesteps = int(data.shape[0]*0.8)
data = torch.DoubleTensor(np.asarray(data))
env = ENV(state_dim, action_dim, data)
# init training variables
state = env.reset()
done = True
episode_length = 0
count = 0
while count<max_timesteps-1:
episode_length += 1
if done:
cx = Variable(torch.zeros(1, state_dim))
hx = Variable(torch.zeros(1, state_dim))
else:
cx = Variable(cx.data)
hx = Variable(hx.data)
values = []
log_probs = []
rewards = []
entropies = []
for step in range(max_timesteps):
value, action_values, (hx, cx) = model((Variable(state.unsqueeze(0)), (hx, cx)))
prob = F.softmax(action_values,dim = 0)
log_prob = F.log_softmax(action_values,dim = 0)
entropy = -(log_prob * prob).sum(1)
entropies.append(entropy)
action = prob.multinomial(num_samples=1).data
log_prob = log_prob.gather(1, Variable(action))
values.append(value)
log_probs.append(log_prob)
state, reward, done = env.step(action.numpy())
print(ticker," reward", reward , " rank:",rank)
count+=1
done = (done or count == max_timesteps-2)
reward = max(min(reward, 1), -1)
if done:
episode_length = 0
state = env.reset()
rewards.append(reward)
if done:
break
R = torch.zeros(1, 1)
if not done:
value, _, _ = model((Variable(state.unsqueeze(0)), (hx, cx)))
R = value.data
values.append(Variable(R))
policy_loss = 0
value_loss = 0
R = Variable(R)
gae = torch.zeros(1, 1)
for i in reversed(range(len(rewards))):
R = params.gamma * R + rewards[i]
advantage = R - values[i]
value_loss = value_loss + 0.5 * advantage.pow(2)
TD = rewards[i] + params.gamma * values[i + 1].data - values[i].data
gae = gae * params.gamma * params.tau + TD
policy_loss = policy_loss - log_probs[i] * Variable(gae) - 0.01 * entropies[i]
optimizer.zero_grad()
(policy_loss + 0.5 * value_loss).backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 40)
optimizer.step()
f.flush()
except:
print(ticker)
traceback.print_exc()
var = traceback.format_exc()
f.write(str(ticker)+"\n")
f.write("exception:\n"+str(var))
f.flush()
# Implementing the Adam optimizer with shared states
class Params():
def __init__(self):
self.lr = 0.001
self.gamma = 0.99
self.tau = 1.
self.seed = 1
self.num_processes = 5
self.num_steps = 20
self.max_episode_length = 10000
self.env_name = 'Breakout-v0'
params = Params()
state_dim = 6
action_dim = 3
model = ActorCritic(state_dim, action_dim)
model.share_memory()
optimizer = SharedAdam(model.parameters(), lr=params.lr)
optimizer.share_memory()
# set the parameters
epochs = 1
state_dim = 6
action_dim = 3
max_action = 1
idx = 0
file_name = "%s" % ("computational__reward")
directory="./pytorch_models"
# instantiate policy
tickers = pd.read_csv("tickers.csv")
indices = tickers['Symbol']
jobs = []
#indices = ['FTK']
for ep in range(1):
print("epoch:",ep)
f.write("epoch:"+str(ep)+"\n")
for ticker in indices:
try:
for rank in range(0, params.num_processes): # making a loop to run all the other processes that will be trained by updating the shared model
if __name__ == '__main__':
p = multiprocessing.Process(target=train, args=(rank, params, model, optimizer,ticker,))
jobs.append(p)
p.start()
for p in jobs: # creating a pointer that will allow to kill all the threads when at least one of the threads, or main.py will be killed, allowing to stop the program safely
p.join()
model.save("A3C_multi"+ str(ep)+"_" + file_name, directory="./pytorch_models")
f.flush()
except Exception as e:
print(e)
traceback.print_exc()
var = traceback.format_exc()
f.write(str(ticker)+"\n")
f.write("exception:\n"+str(var))
model.save("A3C"+ str(ep)+"_" + file_name, directory="./pytorch_models")
Я получаю 6 раз это сообщение:
epoch: 0
epoch: 0
epoch: 0
epoch: 0
epoch: 0
epoch: 0