python многопроцессорная очередь неправильно распараллеливается - PullRequest
0 голосов
/ 06 февраля 2020

Мне трудно заставить мой код правильно работать параллельно. Мне нужно следующее:

  • У меня есть 4 дисковых кода, которые мне нужно развивать в течение определенного времени, с дискретными временными шагами
  • Я хочу развить каждый из этих Параллельные коды

Из-за природы кодов и данных, которые мне нужно хранить, после попытки пулов процессов я решил, что мой лучший способ go - это использовать очередь , Я пробовал два разных кода. В этом примере disk_codes - это просто числа, но в моем реальном коде они являются отдельными экземплярами другого кода для решения уравнений движения на диске.

Код 1:

import multiprocessing

try:
    import queue
except:
    import Queue as queue

def evolve_single_disk(queue, dt):
    print "Empty queue? ", queue.empty()
    code = queue.get()
    print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
    queue.task_done()

if __name__ == '__main__':
    ncores = 4
    code_queue = queue.Queue()
    disk_codes = range(ncores)

    for disk in disk_codes:
        code_queue.put(disk)

    dt = 1
    t_end = 10
    t = 0

    # Evolve!
    while t < t_end:
        print "t=", t

        processes = multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, ))

        processes.start()
        processes.join()

        disk_codes = code_queue.get()
        print "disk codes: ", disk_codes


        t += dt

Это приводит к:

t= 0
Empty queue?  False
Evolving disk 0 in Process-1
disk codes:  0

t= 1
Empty queue?  False
Evolving disk 1 in Process-2
disk codes:  1

t= 2
Empty queue?  False
Evolving disk 2 in Process-3
disk codes:  2

t= 3
Empty queue?  False
Evolving disk 3 in Process-4
disk codes:  3

t= 4
Empty queue?  True

Таким образом, на каждом временном шаге один из дисков «эволюционирует». Это не то, что я хочу, так как я хочу, чтобы все четыре диска развивались параллельно за один и тот же временной шаг.

Затем я попробовал это:

Код 2:

import multiprocessing

try:
    import queue
except:
    import Queue as queue


def evolve_single_disk(queue, dt):
    print "Empty queue? ", queue.empty()
    code = queue.get()  
    print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
    queue.task_done()


if __name__ == '__main__':
    ncores = 4
    code_queue = queue.Queue()
    disk_codes = range(ncores)

    for disk in disk_codes:
        code_queue.put(disk)

    dt = 1
    t_end = 10
    t = 0

    # Evolve!
    while t < t_end:
        print ""
        print "t=", t

        processes = [multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, )) for x in range(ncores)]

        for p in processes:
            p.start()
            p.join()

        disk_codes = [code_queue.get() for p in processes]
        print "disk codes: ", disk_codes

        t += dt

Что приводит к:

t= 0
Empty queue?  False
Evolving disk 0 in Process-1
Empty queue?  False
Evolving disk 0 in Process-2
Empty queue?  False
Evolving disk 0 in Process-3
Empty queue?  False
Evolving disk 0 in Process-4
disk codes:  [0, 1, 2, 3]

t= 1
Empty queue?  True

... и тогда код просто зависает. Итак, здесь я запускаю 4 процесса на каждом временном шаге, но каждый процесс получает один и тот же диск.

Как я могу сделать это правильно, чтобы на каждом временном шаге было 4 процесса и каждый из них развивается 1 единственный диск? Я прочитал документацию и множество учебных пособий и ТАК вопросы / ответы, но я все еще в замешательстве.

Редактировать:

Я пытался использовать multiprocessing очереди, но затем я получаю ошибку TypeError, когда Я пытаюсь поместить дисковые коды в очередь. К сожалению, коды дисков также не могут быть засолены. Трассировка при использовании очереди multiprocessing с кодами дисков:

t= 0
<multiprocessing.queues.Queue object at 0x7fee4a2d3950>
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Empty queue?  True

1 Ответ

1 голос
/ 06 февраля 2020

Переместите это внутрь, пока l oop:

    for disk in disk_codes:
        code_queue.put(disk)

Вот полный код:

import multiprocessing

def evolve_single_disk(queue, result, dt):
    print "Empty queue? ", queue.empty()
    code = queue.get()
    print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
    result.put(code)


if __name__ == '__main__':
    ncores = 4
    code_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    disk_codes = range(ncores)

    dt = 1
    t_end = 10
    t = 0

    # Evolve!
    while t < t_end:
        for disk in disk_codes:
            code_queue.put(disk)

        print ""
        print "t=", t

        process_list = list()
        for x in range(ncores):
            process = multiprocessing.Process(target=evolve_single_disk, args=(code_queue, result_queue, dt))
            process_list.append(process)

        for p in process_list:
            p.start()
            p.join()

        disk_codes = [result_queue.get() for p in process_list]
        print "disk codes: ", disk_codes

        t += dt

Выход


t= 0
Empty queue?  False
Evolving disk 0 in Process-1
Empty queue?  False
Evolving disk 1 in Process-2
Empty queue?  False
Evolving disk 2 in Process-3
Empty queue?  False
Evolving disk 3 in Process-4
disk codes:  [0, 1, 2, 3]

t= 1
Empty queue?  False
Evolving disk 0 in Process-5
Empty queue?  False
Evolving disk 1 in Process-6
Empty queue?  False
Evolving disk 2 in Process-7
Empty queue?  False
Evolving disk 3 in Process-8
disk codes:  [0, 1, 2, 3]

...

t= 9
Empty queue?  False
Evolving disk 0 in Process-37
Empty queue?  False
Evolving disk 1 in Process-38
Empty queue?  False
Evolving disk 2 in Process-39
Empty queue?  False
Evolving disk 3 in Process-40
disk codes:  [0, 1, 2, 3]
...