Я использую платформу Pytorch, и мне нужно создать собственную процедуру синхронизации.Лучшее, что я могу придумать, - это создать поток, который ждет, когда узлы отправителя отправят свои соответствующие значения тензора.Но проблема в том, что отправляющие узлы застряли в команде dist.send()
.
Вот урезанный код, который я использую для тестирования, который представляет операцию, выполняемую нашим фактическим кодом:
def run(rank_local, rank, world_size):
print("I WAS SPAWNED ", rank_local, " OF ", rank)
tensor_1 = torch.zeros(1)
tensor_1 += 1
while True:
print("I am spawn of: ", rank, "and my tensor value before receive: ", tensor_1[0])
nt = dist.recv(tensor_1)
print("I am spawn of: ", rank, "and my tensor value after receive from", nt, " is: ", tensor_1[0])
def communication(tensor, rank):
if rank != 0:
tensor += (100 + rank)
dist.send(tensor, dst=0)
else:
tensor -= 1000
dist.send(tensor, dst=1)
print("I AM DONE WITH MY SENDS NOW WHAT?: ", rank)
if __name__ == '__main__':
# Initialize Process Group
dist.init_process_group(backend="mpi", group_name="main")
# get current process information
world_size = dist.get_world_size()
rank = dist.get_rank()
#torch.cuda.set_device(rank%2)
# Establish Local Rank and set device on this node
p = ThreadWithReturnValue(target=run, args=(0, rank, world_size)) #mp.Process(target=run, args=(0, rank, world_size))
p.start()
tensor = torch.zeros(1)
communication(tensor, rank)
p.join()
Понятия не имею, как решить эту проблему.Обратите внимание, что код прекрасно работает, когда я удаляю строку torch.cuda.set_device(rank%2)
, но я хочу запустить свою модель на своих графических процессорах.Есть идеи?