Трубы застревают - другого решения по переполнению стека нет - PullRequest
2 голосов
/ 17 апреля 2019

(ОБНОВЛЕНО) Я создаю модуль для распределения моделей на основе агентов, идея состоит в том, чтобы разделить модель на несколько процессов, а затем, когда агенты достигают границы, они передаются процессору, обрабатывающему эту область.Я могу настроить процессы и работать без связи, но не могу заставить данные проходить через каналы и обновлять сегмент модели на другом процессоре.

Я испробовал решения для stackoverflow и построил простую версию модели.Как только я помещаю объект модели в канал, модель зависает (она работает со стандартными типами данных Python).Простая версия просто передает агентов туда и обратно.

 from pathos.multiprocessing import ProcessPool
 from pathos.helpers import mp
 import copy



 class TestAgent:

 "Agent Class-- Schedule iterates through each agent and \
  executes step function"

 def __init__(self, unique_id, model):
      self.unique_id = unique_id
      self.model = model
      self.type = "agent"

 def step(self):

       pass 
       #print ('     ', self.unique_id, "I have stepped")

class TestModel:

   "Model Class iterates through schedule and executes step function for \
   each agent"

   def __init__(self):
       self.schedule = []
       self.pipe = None
       self.process = None

       for i in range(1000):
           a = TestAgent(i, self)
           self.schedule.append(a)


   def step(self):


       for a in self.schedule:
           a.step()

if __name__ == '__main__':

   pool = ProcessPool(nodes=2)

   #create instance of model
   test_model = TestModel()
   #create copies of model to be run on 2 processors
   test1 = copy.deepcopy(test_model)
   #clear schedule
   test1.schedule = []
   #Put in only half the schedule
   for i in range(0,500):
       test1.schedule.append(test_model.schedule[i])  
   #Give process tracker number
   test1.process = 1
   #repeat for other processor
   test2= copy.deepcopy(test_model)
   test2.schedule = []
   for i in range(500,1000):
       test2.schedule.append(test_model.schedule[i])
   test2.process = 2

   #create pipe
   end1, end2 = mp.Pipe()

   #Main run function for each process
   def run(model, pipe):

      for i in range(5):
          print (model.process)#, [a.unique_id for a in model.schedule])
          model.step() # IT HANGS AFTER INITIAL STEP
          print ("send")
          pipe.send(model.schedule)
          print ("closed")
          sched = pipe.recv()
          print ("received")
          model.schedule = sched



   pool.map(run, [test1, test2], [end1,end2])

Агенты должны переключать процессоры и выполнять свои функции печати.(Моей следующей проблемой будет синхронизация процессоров, чтобы они оставались на каждом шаге, но по одной за раз.)

enter image description here

1 Ответ

0 голосов
/ 18 апреля 2019

Я получил его на работу.Я превышал предел буфера в Python (8192).Это особенно верно, если агент содержит копию модели в качестве атрибута.Ниже приведена рабочая версия приведенного выше кода, которая передает агентам по одному.Он использует Pympler, чтобы получить размер всех агентов.

from pathos.multiprocessing import ProcessPool
from pathos.helpers import mp
import copy

# do a blocking map on the chosen function

class TestAgent:

"Agent Class-- Schedule iterates through each agent and \
executes step function"

   def __init__(self, unique_id, model):
       self.unique_id = unique_id
       self.type = "agent"

   def step(self):
       pass 


class TestModel:

   "Model Class iterates through schedule and executes step function for \
   each agent"

   def __init__(self):
       from pympler import asizeof 

       self.schedule = []
       self.pipe = None
       self.process = None
       self.size = asizeof.asizeof


       for i in range(1000):
           a = TestAgent(i, self)
           self.schedule.append(a)


   def step(self):


       for a in self.schedule:
           a.step()

if __name__ == '__main__':

   pool = ProcessPool(nodes=2)

   #create instance of model
   test_model = TestModel()
   #create copies of model to be run on 2 processors
   test1 = copy.deepcopy(test_model)
   #clear schedule
   test1.schedule = []
   #Put in only half the schedule
   for i in range(0,500):
       test1.schedule.append(test_model.schedule[i])  
   #Give process tracker number
   test1.process = 1
   #repeat for other processor
   test2= copy.deepcopy(test_model)
   test2.schedule = []
   for i in range(500,1000):
       test2.schedule.append(test_model.schedule[i])
   test2.process = 2

   #create pipe
   end1, end2 = mp.Pipe()

   #Main run function for each process
   def run(model, pipe):

      for i in range(5):
        agents = []
        print (model.process, model.size(model.schedule) ) 
        model.step() # IT HANGS AFTER INITIAL STEP
        #agent_num = list(model.schedule._agents.keys())
        for agent in model.schedule[:]:
            model.schedule.remove(agent)
            pipe.send(agent)
            agent = pipe.recv()
            agents.append(agent)
        print (model.process, "all agents received")
        for agent in agents: 
            model.schedule.append(agent)

        print (model.process, len(model.schedule))



   pool.map(run, [test1, test2], [end1,end2])

Майк Маккернс и Томас Моро - Спасибо за помощь, вы направили меня на правильный путь.

...