Рекурсивная многопроцессорная обработка Python - слишком много потоков - PullRequest
0 голосов
/ 10 октября 2018

Справочная информация:

Python 3.5.1, Windows 7

У меня есть сетевой диск, который содержит большое количество файлов и каталогов.Я пытаюсь написать сценарий, чтобы как можно быстрее проанализировать все это, чтобы найти все файлы, которые соответствуют RegEx, и скопировать эти файлы на мой локальный компьютер для проверки.Существует около 3500 каталогов и подкаталогов, а также несколько миллионов файлов.Я пытаюсь сделать это как можно более универсальным (т.е. не писать код для этой точной файловой структуры), чтобы повторно использовать это для других сетевых дисков.Мой код работает при работе с небольшим сетевым диском, проблема здесь, кажется, в масштабируемости.

Я пробовал несколько вещей с помощью многопроцессорной библиотеки и не могу заставить ее работать надежно.Моя идея состояла в том, чтобы создать новую работу для разбора каждого подкаталога, чтобы работать как можно быстрее.У меня есть рекурсивная функция, которая анализирует все объекты в каталоге, затем вызывает себя для любых подкаталогов и проверяет любые файлы, которые она находит, с помощью RegEx.

Вопрос: как я могу ограничитьколичество потоков / процессов без использования пулов для достижения моей цели?

Что я пробовал:

  • Если я использую только задания процессов, я получаюошибка RuntimeError: can't start new thread после более чем нескольких сотен потоков, и он начинает сбрасывать соединения.В итоге получилось около половины найденных файлов, так как в половине каталогов произошла ошибка (см. Код ниже).
  • Чтобы ограничить общее количество потоков, я пытался использовать методы Pool, но я не могу 'передать объекты пула вызываемым методам в соответствии с этим вопросом , что делает невозможной реализацию рекурсии.
  • Чтобы это исправить, я попытался вызвать процессы внутри методов пула, но получаюошибка daemonic processes are not allowed to have children.
  • Я думаю, что если я смогу ограничить число одновременных потоков, то мое решение будет работать в соответствии с планом.

Код:

import os
import re
import shutil
from multiprocessing import Process, Manager

CheckLocations = ['network drive location 1', 'network drive location 2']
SaveLocation = 'local PC location'
FileNameRegex = re.compile('RegEx here', flags = re.IGNORECASE)


# Loop through all items in folder, and call itself for subfolders.
def ParseFolderContents(path, DebugFileList):

    FolderList = []
    jobs = []
    TempList = []

    if not os.path.exists(path):
        return

    try:

        for item in os.scandir(path):

            try:

                if item.is_dir():
                    p = Process(target=ParseFolderContents, args=(item.path, DebugFileList))
                    jobs.append(p)
                    p.start()

                elif FileNameRegex.search(item.name) != None:
                    DebugFileList.append((path, item.name))

                else:
                    pass

            except Exception as ex:
                if hasattr(ex, 'message'):
                    print(ex.message)
                else:
                    print(ex)
                    # print('Error in file:\t' + item.path)

    except Exception as ex:
        if hasattr(ex, 'message'):
            print(ex.message)
        else:
            print('Error in path:\t' + path)
            pass

        else:
            print('\tToo many threads to restart directory.')

    for job in jobs:
        job.join()


# Save list of debug files.
def SaveDebugFiles(DebugFileList):

    for file in DebugFileList:
        try:
            shutil.copyfile(file[0] + '\\' + file[1], SaveLocation + file[1])
        except PermissionError:
            continue


if __name__ == '__main__':

    with Manager() as manager:

        # Iterate through all directories to make a list of all desired files.
        DebugFileList = manager.list()
        jobs = []

        for path in CheckLocations:
            p = Process(target=ParseFolderContents, args=(path, DebugFileList))
            jobs.append(p)
            p.start()
        for job in jobs:
            job.join()

        print('\n' + str(len(DebugFileList)) + ' files found.\n')
        if len(DebugFileList) == 0:
            quit()

        # Iterate through all debug files and copy them to local PC.
        n = 25 # Number of files to grab for each parallel path.
        TempList = [DebugFileList[i:i + n] for i in range(0, len(DebugFileList), n)] # Split list into small chunks.
        jobs = []

        for item in TempList:
            p = Process(target=SaveDebugFiles, args=(item, ))
            jobs.append(p)
            p.start()

        for job in jobs:
            job.join()

Ответы [ 2 ]

0 голосов
/ 25 октября 2018

Мне не удалось заставить это работать именно так, как я хотел.os.walk был медленным, и любой другой метод, о котором я думал, был либо похожей скоростью, либо зависал из-за слишком большого количества потоков.

В итоге я использовал похожий метод, который я опубликовал выше, но вместо запуска рекурсиив каталоге верхнего уровня он будет идти вниз на один или два уровня, пока не будет несколько каталогов.Затем он запускает рекурсию в каждом из этих каталогов последовательно, что ограничивает количество потоков, достаточное для успешного завершения.Время выполнения аналогично os.walk, что, вероятно, сделает более простую и удобочитаемую реализацию.

0 голосов
/ 10 октября 2018

Не пренебрегайте полезностью пулов, особенно если вы хотите контролировать количество создаваемых процессов.Они также заботятся о том, чтобы управлять вашими работниками (создавать / запускать / объединять / распределять куски работы) и помогать вам собирать потенциальные результаты.

Как вы сами осознали, вы создаете слишком много процессов, вплоть до определенного момента.где вы, кажется, исчерпали столько системных ресурсов, что не можете создавать больше процессов.

Кроме того, создание новых процессов в вашем коде контролируется внешними факторами, то есть количеством папок в ваших файловых деревьях, что очень затрудняет ограничение количества процессов.Кроме того, создание нового процесса сопряжено с некоторыми накладными расходами в ОС, и вы можете даже потратить эти накладные расходы на пустые каталоги.Кроме того, переключение контекста между процессами является довольно дорогостоящим.

Учитывая количество создаваемых вами процессов и указанное вами количество папок, ваши процессы в основном будут просто сидеть и простаивать большую часть времени, пока они ждутдоля процессорного времени, чтобы фактически сделать некоторую работу.За указанное процессорное время будет много споров, если только в вашем распоряжении не будет суперкомпьютер с тысячами ядер.И даже когда процесс получает некоторое время процессора, он, скорее всего, потратит немало времени на ожидание операций ввода-вывода.

При этом вы, вероятно, захотите изучить использование потоков для такой задачи.И вы могли бы сделать некоторую оптимизацию в вашем коде.Из вашего примера я не вижу причин, по которым вы бы разделили идентифицирующие файлы для копирования и фактически копировали их в разные задачи.Почему бы не позволить вашим работникам сразу скопировать каждый найденный файл, соответствующий RE?

Я бы создал список файлов в рассматриваемых каталогах, используя os.walk (который я считаю разумнымбыстро) из основного потока, а затем выгрузить этот список в пул рабочих, который проверяет эти файлы на наличие совпадений и сразу же копирует их:

import os
import re
from multiprocessing.pool import ThreadPool

search_dirs = ["dir 1", "dir2"]
ptn = re.compile(r"your regex")
# your target dir definition

file_list = []

for topdir in search_dirs:
    for root, dirs, files in os.walk(topdir):
        for file in files:
            file_list.append(os.path.join(root, file))

def copier(path):
    if ptn.match(path):
        # do your shutil.copyfile with the try-except right here
        # obviously I did not want to start mindlessly copying around files on my box :)
        return path

with ThreadPool(processes=10) as pool:
    results = pool.map(copier, file_list)

# print all the processed files. For those that did not match, None is returned
print("\n".join([r for r in results if r]))

С другой стороны: не объединяйте пути вручную (file[0] + "\\" + file[1]), для этого лучше использовать os.path.join.

...