Python присоединяется к процессу, не блокируя родительский процесс - PullRequest
19 голосов
/ 06 марта 2011

Я пишу программу, которая будет отслеживать конкретный каталог на наличие новых файлов, содержащих URL-адреса для загрузки.Как только новый файл обнаружен, он создаст новый процесс для фактической загрузки, в то время как родитель продолжает следить за каталогом.Я использую интерфейс Process от multiprocessing.У меня проблема в том, что, если я не вызову process.join (), дочерний процесс все еще выполняется, но process.join () - это блокирующая функция, которая лишает цели создания дочернего процесса для фактической загрузки.

Мой вопрос: есть ли способ присоединиться к дочернему процессу неблокирующим образом, который позволит родителю продолжать делать свое дело?

Неполный код:

def main(argv):
  # parse command line args
  ...
  # set up variables
  ...
  watch_dir(watch_dir, download_dir)


def watch_dir(wDir, dDir):
  # Grab the current watch directory listing
  before = dict([(f, None) for f in os.listdir (wDir)])

  # Loop FOREVER
  while 1:
    # sleep for 10 secs
    time.sleep(10)

    # Grab the current dir listing
    after = dict([(f, None) for f in os.listdir (wDir)])

    # Get the list of new files
    added = [f for f in after if not f in before]
    # Get the list of deleted files
    removed = [f for f in before if not f in after]

    if added:
      # We have new files, do your stuff
      print "Added: ", ", ".join(added)

      # Call the new process for downloading
      p = Process(target=child, args=(added, wDir, dDir))
      p.start()
      p.join()

    if removed:
      # tell the user the file was deleted
      print "Removed: ", ", ".join(removed)

    # Set before to the current
    before = after

def child(filename, wDir, dDir):
  # Open filename and extract the url
  ...
  # Download the file and to the dDir directory
  ...
  # Delete filename from the watch directory
  ...
  # exit cleanly
  os._exit(0)

parent ожидает, пока ребенок завершит выполнение, прежде чем продолжить после p.join(), что (насколько я могу судить) является правильным.Но это побеждает всю цель создания ребенка.Если я остановлюсь p.join(), тогда ребенок останется активным, и ps ax | grep питон даст мне 'python '.

Я бы хотел, чтобы ребенок закончил то, что делал, и ушел, не удерживаядо родителя.Есть ли способ сделать это?

Ответы [ 4 ]

14 голосов
/ 06 марта 2011

Вы можете создать отдельный поток, который будет выполнять присоединение. Пусть он прослушивает очередь , в которую вы помещаете дескрипторы подпроцесса:

class Joiner(Thread):
    def __init__(self, q):
        self.__q = q
    def run(self):
        while True:
            child = self.__q.get()
            if child == None:
                return
            child.join()

Затем вместо p.join() выполните joinq.put(p) и joinq.put(None), чтобы дать сигнал потоку остановиться. Убедитесь, что вы используете очередь FIFO.

6 голосов
/ 26 февраля 2013

В вашем цикле while вызовите

multiprocessing.active_children()

Возвращение списка всех живых потомков текущего процесса.Вызов этого имеет побочный эффект «присоединения» к любым процессам, которые уже завершены.

3 голосов
/ 08 августа 2014

Вместо того, чтобы пытаться заставить multiprocessing.Process() работать на вас, возможно, вам следует использовать другой инструмент, такой как apply_async() с многопроцессорной обработкой.Pool ():

def main(argv):
    # parse command line args
    ...
    # set up variables
    ...

    # set up multiprocessing Pool
    pool = multiprocessing.Pool()

    try:
        watch_dir(watch_dir, download_dir, pool)

    # catch whatever kind of exception you expect to end your infinite loop
    # you can omit this try/except if you really think your script will 
    # run "forever" and you're okay with zombies should it crash
    except KeyboardInterrupt:
        pool.close()
        pool.join()

def watch_dir(wDir, dDir, pool):
    # Grab the current watch directory listing
    before = dict([(f, None) for f in os.listdir (wDir)])

    # Loop FOREVER
    while 1:
        # sleep for 10 secs
        time.sleep(10)

        # Grab the current dir listing
        after = dict([(f, None) for f in os.listdir (wDir)])

        # Get the list of new files
        added = [f for f in after if not f in before]
        # Get the list of deleted files
        removed = [f for f in before if not f in after]

        if added:
            # We have new files, do your stuff
            print "Added: ", ", ".join(added)

            # launch the function in a subprocess - this is NON-BLOCKING
            pool.apply_async(child, (added, wDir, dDir))

        if removed:
            # tell the user the file was deleted
            print "Removed: ", ", ".join(removed)

        # Set before to the current
        before = after

def child(filename, wDir, dDir):
    # Open filename and extract the url
    ...
    # Download the file and to the dDir directory
    ...
    # Delete filename from the watch directory
    ...
    # simply return to "exit cleanly"
    return

multiprocessing.Pool() - это пул рабочих подпроцессов, в которые вы можете отправлять «задания». Вызов функции pool.apply_async() приводит к тому, что один из подпроцессов запускает вашу функцию с предоставленными аргументами асинхронно и не требует объединения, пока ваш скрипт не выполнит всю свою работу и не закроет весь пул. Библиотека управляет деталями для вас.

Я думаю, что это послужит вам лучше, чем текущий принятый ответ по следующим причинам:
1. Это устраняет ненужную сложность запуска дополнительных потоков и очередей только для управления подпроцессами.
2. Он использует библиотечные подпрограммы, которые созданы специально для этой цели , так что вы получите выгоду от будущих улучшений библиотеки.
3. ИМХО, это гораздо удобнее в обслуживании.
4. Это более гибкий. Если вы однажды решите, что хотите увидеть возвращаемое значение из своих подпроцессов, вы можете сохранить возвращаемое значение из вызова apply_async() (объект результата ) и проверить его в любое время. Вы можете хранить их в списке и обрабатывать как пакеты, когда ваш список становится больше определенного размера. Вы можете переместить создание пула в функцию watch_dir() и покончить с попыткой /, за исключением случаев, когда вас действительно не волнует, что произойдет, если «бесконечный» цикл прерывается. Если вы поместили какое-то условие разрыва в (в настоящее время) бесконечный цикл, вы можете просто добавить pool.close() и pool.join() после цикла, и все будет очищено.

2 голосов
/ 07 марта 2011

Если вас не волнует, когда и когда ребенок прекратит свое существование, и вы просто хотите, чтобы ребенок не превратился в процесс зомби, тогда вы можете сделать двойную вилку, чтобы внук стал ребенком.init.В коде:

def child(*args):
  p = Process(target=grandchild, args=args)
  p.start()
  os._exit(0)

def grandchild(filename, wDir, dDir):
  # Open filename and extract the url
  ...
  # Download the file and to the dDir directory
  ...
  # Delete filename from the watch directory
  ...
  # exit cleanly
  os._exit(0)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...