(ОБНОВЛЕНО) Я создаю модуль для распределения моделей на основе агентов, идея состоит в том, чтобы разделить модель на несколько процессов, а затем, когда агенты достигают границы, они передаются процессору, обрабатывающему эту область.Я могу настроить процессы и работать без связи, но не могу заставить данные проходить через каналы и обновлять сегмент модели на другом процессоре.
Я испробовал решения для stackoverflow и построил простую версию модели.Как только я помещаю объект модели в канал, модель зависает (она работает со стандартными типами данных Python).Простая версия просто передает агентов туда и обратно.
from pathos.multiprocessing import ProcessPool
from pathos.helpers import mp
import copy
class TestAgent:
"Agent Class-- Schedule iterates through each agent and \
executes step function"
def __init__(self, unique_id, model):
self.unique_id = unique_id
self.model = model
self.type = "agent"
def step(self):
pass
#print (' ', self.unique_id, "I have stepped")
class TestModel:
"Model Class iterates through schedule and executes step function for \
each agent"
def __init__(self):
self.schedule = []
self.pipe = None
self.process = None
for i in range(1000):
a = TestAgent(i, self)
self.schedule.append(a)
def step(self):
for a in self.schedule:
a.step()
if __name__ == '__main__':
pool = ProcessPool(nodes=2)
#create instance of model
test_model = TestModel()
#create copies of model to be run on 2 processors
test1 = copy.deepcopy(test_model)
#clear schedule
test1.schedule = []
#Put in only half the schedule
for i in range(0,500):
test1.schedule.append(test_model.schedule[i])
#Give process tracker number
test1.process = 1
#repeat for other processor
test2= copy.deepcopy(test_model)
test2.schedule = []
for i in range(500,1000):
test2.schedule.append(test_model.schedule[i])
test2.process = 2
#create pipe
end1, end2 = mp.Pipe()
#Main run function for each process
def run(model, pipe):
for i in range(5):
print (model.process)#, [a.unique_id for a in model.schedule])
model.step() # IT HANGS AFTER INITIAL STEP
print ("send")
pipe.send(model.schedule)
print ("closed")
sched = pipe.recv()
print ("received")
model.schedule = sched
pool.map(run, [test1, test2], [end1,end2])
Агенты должны переключать процессоры и выполнять свои функции печати.(Моей следующей проблемой будет синхронизация процессоров, чтобы они оставались на каждом шаге, но по одной за раз.)