Многопоточный Python FS Crawler - PullRequest
4 голосов
/ 26 января 2011

Я написал функцию python, которая просматривает файловую систему, используя предоставленный шаблон каталога, с необязательными «действиями» для выполнения на каждом уровне.Затем я попытался использовать многопоточность, поскольку некоторые тома находятся в общих сетевых ресурсах, и я хочу минимизировать блокировку ввода-вывода.Я начал с использования класса мультипроцессорного пула, так как это было наиболее удобно ... (серьезно, нет класса пула для многопоточности?) Моя функция максимально распределяет предоставленный шаблон FS и отправляет вновь возвращаемые пути в пул, пока не появятся новые путивозвращаются.Я получил это, чтобы отлично работать при непосредственном использовании функции и класса, но теперь я пытаюсь использовать эту функцию из другого класса, и моя программа, кажется, зависает.Для упрощения я переписал функцию, используя Threads вместо Processes, и даже написал простой класс ThreadPool ... та же проблема.Вот очень упрощенная версия кода, которая по-прежнему имеет те же проблемы:

file test1.py:
------------------------------------------------

import os
import glob
from multiprocessing import Pool

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for result in pool.map(glob.glob,paths):
        results += result
    return results

def findAllMyPaths():
    pool = Pool(10)
    paths = ['/Volumes']
    follow = ['**','ptid_*','expid_*','slkid_*']
    for pattern in follow:
        paths = mapGlob(pool,paths,pattern)
    return paths


file test2.py:
----------------------------------------------------------------------------

from test1 import findAllMyPaths

allmypaths = findAllMyPaths()

Теперь, если я позвоню

>>>from test1 import findAllMyPaths
>>>findAllMyPaths()
>>>...long list of all the paths

, это работает нормально, но если попытаться:

>>>from test2 import allmypaths

Питон висит навсегда.Функции действий вызываются (в этом примере glob), но они никогда не возвращаются ... Мне нужна помощь, пожалуйста ... распараллеленная версия работает намного быстрее, когда она работает должным образом (в 6-20 раз быстрее, в зависимости от того, какие "действия" являютсяотображается в каждой точке дерева FS), поэтому я хотел бы иметь возможность использовать его.

также, если я изменю функцию отображения на непараллельную версию:

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for path in paths:
        results += glob.glob(path)
    return results

все работает нормально.

Редактировать:

Я включил отладку в многопроцессорной обработке, чтобы посмотреть, может ли это помочь мне в дальнейшем.В случае, если это работает, я получаю:

[DEBUG/MainProcess] created semlock with handle 5
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 9
[DEBUG/MainProcess] created semlock with handle 10
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] closing pool
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-2] worker got sentinel -- exiting
[DEBUG/PoolWorker-1] worker got sentinel -- exiting
[INFO/PoolWorker-2] process shutting down
[DEBUG/PoolWorker-7] worker got sentinel -- exiting
[INFO/PoolWorker-1] process shutting down
[INFO/PoolWorker-7] process shutting down
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers
[INFO/PoolWorker-7] process exiting with exitcode 0
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers
[INFO/PoolWorker-1] process exiting with exitcode 0
[DEBUG/PoolWorker-5] worker got sentinel -- exiting
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0
[INFO/PoolWorker-5] process shutting down
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers
[INFO/PoolWorker-2] process exiting with exitcode 0
[INFO/PoolWorker-5] process exiting with exitcode 0
[DEBUG/PoolWorker-6] worker got sentinel -- exiting
[INFO/PoolWorker-6] process shutting down
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers
[INFO/PoolWorker-6] process exiting with exitcode 0
[DEBUG/PoolWorker-4] worker got sentinel -- exiting
[DEBUG/PoolWorker-9] worker got sentinel -- exiting
[INFO/PoolWorker-9] process shutting down
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers
[INFO/PoolWorker-9] process exiting with exitcode 0
[INFO/PoolWorker-4] process shutting down
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers
[INFO/PoolWorker-4] process exiting with exitcode 0
[DEBUG/PoolWorker-10] worker got sentinel -- exiting
[INFO/PoolWorker-10] process shutting down
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers
[INFO/PoolWorker-10] process exiting with exitcode 0
[DEBUG/PoolWorker-8] worker got sentinel -- exiting
[INFO/PoolWorker-8] process shutting down
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers
[INFO/PoolWorker-8] process exiting with exitcode 0
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[INFO/PoolWorker-3] process shutting down
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[INFO/PoolWorker-3] process exiting with exitcode 0
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers

, а когда это не все, я получаю:

[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 7
[DEBUG/MainProcess] created semlock with handle 10
[DEBUG/MainProcess] created semlock with handle 11
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()

Ответы [ 2 ]

1 голос
/ 27 января 2011

Не полное решение, но я нашел способ заставить код работать в любом виде: из интерпретатора или в виде кода в работающем скрипте. Я думаю, что проблема связана со следующим примечанием в многопроцессорной документации:

Функциональность в этом пакете требует, чтобы метод main был импортирован детьми. Это описано в Руководстве по программированию, но на это стоит обратить внимание. Это означает, что некоторые примеры, например, multiprocessing.Pool, не будут работать в интерактивном интерпретаторе.

Я не уверен, почему существует это ограничение, и почему я все еще могу иногда использовать пул из интерактивного интерпретатора, а иногда нет, но, ну, хорошо ...

Чтобы обойти это, я делаю следующее в любом модуле, который может использовать многопроцессорность:

import __main__
__SHOULD_MULTITHREAD__ = False
if hasattr(__main__,'__file__'):
    __SHOULD_MULTITHREAD__ = True

Остальная часть кода в этом модуле может затем проверить этот флаг, чтобы увидеть, следует ли ему использовать пул или просто выполнить без распараллеливания. Делая это, я все еще могу использовать и тестировать параллельные функции в модулях из интерактивного интерпретатора, они просто работают намного медленнее.

0 голосов
/ 26 января 2011

Если я не ошибаюсь, должен ли test2.py выглядеть так

from test1 import findAllMyPaths
allmypaths = findAllMyPaths

, а затем

from test2 import allmypaths  
allmypaths()
...