Мне трудно заставить мой код правильно работать параллельно. Мне нужно следующее:
- У меня есть 4 дисковых кода, которые мне нужно развивать в течение определенного времени, с дискретными временными шагами
- Я хочу развить каждый из этих Параллельные коды
Из-за природы кодов и данных, которые мне нужно хранить, после попытки пулов процессов я решил, что мой лучший способ go - это использовать очередь , Я пробовал два разных кода. В этом примере disk_codes
- это просто числа, но в моем реальном коде они являются отдельными экземплярами другого кода для решения уравнений движения на диске.
Код 1:
import multiprocessing
try:
import queue
except:
import Queue as queue
def evolve_single_disk(queue, dt):
print "Empty queue? ", queue.empty()
code = queue.get()
print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
queue.task_done()
if __name__ == '__main__':
ncores = 4
code_queue = queue.Queue()
disk_codes = range(ncores)
for disk in disk_codes:
code_queue.put(disk)
dt = 1
t_end = 10
t = 0
# Evolve!
while t < t_end:
print "t=", t
processes = multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, ))
processes.start()
processes.join()
disk_codes = code_queue.get()
print "disk codes: ", disk_codes
t += dt
Это приводит к:
t= 0
Empty queue? False
Evolving disk 0 in Process-1
disk codes: 0
t= 1
Empty queue? False
Evolving disk 1 in Process-2
disk codes: 1
t= 2
Empty queue? False
Evolving disk 2 in Process-3
disk codes: 2
t= 3
Empty queue? False
Evolving disk 3 in Process-4
disk codes: 3
t= 4
Empty queue? True
Таким образом, на каждом временном шаге один из дисков «эволюционирует». Это не то, что я хочу, так как я хочу, чтобы все четыре диска развивались параллельно за один и тот же временной шаг.
Затем я попробовал это:
Код 2:
import multiprocessing
try:
import queue
except:
import Queue as queue
def evolve_single_disk(queue, dt):
print "Empty queue? ", queue.empty()
code = queue.get()
print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
queue.task_done()
if __name__ == '__main__':
ncores = 4
code_queue = queue.Queue()
disk_codes = range(ncores)
for disk in disk_codes:
code_queue.put(disk)
dt = 1
t_end = 10
t = 0
# Evolve!
while t < t_end:
print ""
print "t=", t
processes = [multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, )) for x in range(ncores)]
for p in processes:
p.start()
p.join()
disk_codes = [code_queue.get() for p in processes]
print "disk codes: ", disk_codes
t += dt
Что приводит к:
t= 0
Empty queue? False
Evolving disk 0 in Process-1
Empty queue? False
Evolving disk 0 in Process-2
Empty queue? False
Evolving disk 0 in Process-3
Empty queue? False
Evolving disk 0 in Process-4
disk codes: [0, 1, 2, 3]
t= 1
Empty queue? True
... и тогда код просто зависает. Итак, здесь я запускаю 4 процесса на каждом временном шаге, но каждый процесс получает один и тот же диск.
Как я могу сделать это правильно, чтобы на каждом временном шаге было 4 процесса и каждый из них развивается 1 единственный диск? Я прочитал документацию и множество учебных пособий и ТАК вопросы / ответы, но я все еще в замешательстве.
Редактировать:
Я пытался использовать multiprocessing
очереди, но затем я получаю ошибку TypeError, когда Я пытаюсь поместить дисковые коды в очередь. К сожалению, коды дисков также не могут быть засолены. Трассировка при использовании очереди multiprocessing
с кодами дисков:
t= 0
<multiprocessing.queues.Queue object at 0x7fee4a2d3950>
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Empty queue? True