Python: многопроцессорная Queue.put () в модуле ничего не отправит родительскому процессу - PullRequest
0 голосов
/ 27 октября 2019

Я пытаюсь заставить 2 процесса взаимодействовать друг с другом, используя пакет многопроцессорности в Python, а точнее класс Queue (). Из родительского процесса я хочу получать обновленное значение дочернего процесса каждые 5 секунд. Этот дочерний процесс является функцией класса. Я сделал игрушечный пример, где все работает отлично.

Однако, когда я пытаюсь реализовать это решение в моем проекте, кажется, что метод Queue.put () дочернего процесса в субмодуле не будет отправлять что-либо родительскому процессу, потому чтородительский процесс не напечатает нужное значение, и код никогда не останавливается. На самом деле, родительский процесс только печатает значение, отправленное дочернему процессу, которое здесь True, но, как я уже сказал, никогда не останавливается.

Итак, мои вопросы:

1) Есть ли какая-либо ошибка в моем игрушечном примере?

2) Как мне изменить свой проект, чтобы он работал так же, как мой игрушечный пример?

Любая помощь будет высоко оценена, если учесть, что она блокирует заключительную фазу моего проекта.

Пример игрушки: работает

mainмодуль

from multiprocessing import Process, Event, Lock, Queue, Pipe
import time 
import test_mod as test

def loop(output):
    stop_event = Event()
    q = Queue()
    child_process = Process(target=test.child.sub, args=(q,))
    child_process.start()
    i = 0
    print("started at {} ".format(time.time()))

    while not stop_event.is_set():
        i+=1
        time.sleep(5)
        q.put(True)
        print(q.get())
        if i == 5:
            child_process.terminate()
            stop_event.set()

    output.put("main process looped")

if __name__ == '__main__':
    stop_event, output = Event(), Queue()
    k = 0
    while k < 5:
        loop_process = Process(target=loop, args=(output,))
        loop_process.start()
        print(output.get())
        loop_process.join()
        k+=1

субмодуль

from multiprocessing import Process, Event, Lock, Queue, Pipe
import time


class child(object):
    def __init__(self):
        pass

    def sub(q):
        i = 0
        while i < 2000:
            latest_value = time.time()
            accord = q.get()
            if accord == True:
                q.put(latest_value)
            accord = False
            time.sleep(0.0000000005)
            i+=1

Код проекта: не работает

основной модуль

import neat #package in which the submodule is 
import *some other stuff*

def run(config_file):

    config = neat.Config(some configuration)

    p = neat.Population(config)

    **WHERE MY PROBLEM IS**

    stop_event = Event()
    q = Queue()
    pe = neat.ParallelEvaluator(**args)

    child_process = Process(target=p.run, args=(pe.evaluate, q, other args))
    child_process.start()

    i = 0
    while not stop_event.is_set():

        q.put(True)
        print(q.get())
        time.sleep(5)
        i += 1
        if i == 5:
            child_process.terminate()
            stop_event.set()

if __name__ == '__main__':
    run(config_file)

субмодуль

class Population(object):
    def __init__():
      *initialization*

    def run(self, q, other args):

        while n is None or k < n:
            *some stuff*
            accord = add_2.get()
            if accord == True:
                add_2.put(self.best_genome.fitness)
            accord = False

        return self.best_genome

Примечание: 1) я не привык к многопроцессорной обработке

2) IЯ попытался дать наиболее важные части моего проекта, учитывая, что весь код был бы слишком длинным, но если что-то неясно, пожалуйста, сообщите мне, и я обновлю свой вопрос.

3) У меня также естьРассматривается использование Pipe (), однако эта опция тоже не работает.

1 Ответ

1 голос
/ 28 октября 2019

Если я правильно вижу, ваш подмодуль - это класс Population. Однако вы запускаете свой процесс с параметром типа ParallelEvaluator. Далее я не вижу, чтобы вы поставили свою очередь q для подпроцесса. Вот что я вижу из предоставленного кода:

stop_event = Event()
q = Queue()
pe = neat.ParallelEvaluator(**args)

child_process = Process(target=p.run, args=(pe.evaluate, **args)
child_process.start()

Более того, следующие строки создают условие гонки:

q.put(True)
print(q.get())

Команда get похожа на pop. Таким образом, он берет элемент и удаляет его из очереди. Если ваш подпроцесс не имеет доступа к очереди между этими двумя строками (потому что он занят), True никогда не попадет в дочерний процесс. Следовательно, лучше использовать две очереди. По одному на каждое направление. Что-то вроде:

stop_event = Event()
q_in = Queue()
q_out = Queue()
pe = neat.ParallelEvaluator(**args)

child_process = Process(target=p.run, args=(pe.evaluate, **args))
child_process.start()

i = 0
while not stop_event.is_set():

     q_in.put(True)
     print(q_out.get())
     time.sleep(5)
     i += 1
     if i == 5:
         child_process.terminate()
         stop_event.set()

Это ваш подмодуль

class Population(object):
    def __init__():
      *initialization*

    def run(self, **args):

        while n is None or k < n:
            *some stuff*
            accord = add_2.get()           # add_2 = q_in
            if accord == True:
                add_3.put(self.best_genome.fitness)  #add_3 = q_out
            accord = False

        return self.best_genome
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...