Как обработать большой список файлов, используя все процессоры? - PullRequest
0 голосов
/ 17 февраля 2019

Мне нужно конвертировать 86 000 TEX-файлов в XML с помощью библиотеки LaTeXML в командной строке.Я пытался написать скрипт Python для автоматизации этого с помощью модуля subprocess, используя все 4 ядра.

def get_outpath(tex_path):
    path_parts = pathlib.Path(tex_path).parts
    arxiv_id = path_parts[2]
    outpath = 'xml/' + arxiv_id + '.xml'
    return outpath

def convert_to_xml(inpath):
    outpath = get_outpath(inpath)

    if os.path.isfile(outpath):
        message = '{}: Already converted.'.format(inpath)
        print(message)
        return

    try:
        process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                                   stderr=subprocess.PIPE, 
                                   stdout=subprocess.PIPE)
    except Exception as error:
        process.kill()
        message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
        print(message)

    message = '{}: Converted!'.format(inpath)
    print(message)

def start():
    start_time = time.time()
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
                               maxtasksperchild=1)
    print('Initialized {} threads'.format(multiprocessing.cpu_count()))
    print('Beginning conversion...')
    for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
        pass
    pool.close()
    pool.join()
    print("TIME: {}".format(total_time))

start()

В результате выполнения скрипта получается Too many open files и замедляется мой компьютер.Если посмотреть на Activity Monitor, кажется, что этот скрипт пытается создать 86 000 подпроцессов преобразования одновременно, и каждый процесс пытается открыть файл.Может быть, это результат pool.imap_unordered(convert_to_xml, preprints) - возможно, мне не нужно использовать map вместе с subprocess.Popen, так как у меня просто слишком много команд для вызова?Что может быть альтернативой?

Я потратил весь день, пытаясь найти правильный подход к массовой обработке.Я новичок в этой части Python, поэтому любые советы для правильного направления будут высоко оценены.Спасибо!

1 Ответ

0 голосов
/ 17 февраля 2019

В convert_to_xml операторы process = subprocess.Popen(...) порождают подпроцесс latexml.Без блокирующего вызова, такого как process.communicate(), convert_to_xml заканчивается, даже когда latexml продолжает работать в фоновом режиме.

Поскольку convert_to_xml заканчивается, пул отправляет связанному рабочему процессу другую задачу для запускаи поэтому convert_to_xml вызывается снова.Еще раз другой latexml процесс порождается в фоновом режиме.Довольно скоро вы готовы к работе в latexml процессах, и лимит ресурсов на количество открытых файлов достигнут.

Исправить несложно: добавьте process.communicate(), чтобы указать convert_to_xml дождаться, покаlatexml процесс завершен.

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

Относительно if __name__ == '__main__':

Как указал Мартино , в предупреждениемногопроцессорная документация о том, что код, который порождает новые процессы, не должен вызываться на верхнем уровне модуля.Вместо этого код должен содержаться внутри оператора if __name__ == '__main__'.

В Linux ничего страшного не произойдет, если вы проигнорируете это предупреждение.Но в Windows код «форк-бомбы».Точнее, код вызывает незапущенную цепочку подпроцессов, потому что в Windows fork имитируется порождение нового процесса Python, который затем импортирует вызывающий скрипт.Каждый импорт порождает новый процесс Python.Каждый процесс Python пытается импортировать вызывающий скрипт.Цикл не прерывается до тех пор, пока не будут использованы все ресурсы.

Так что, чтобы быть добрыми к нашим братьям, лишенным Windows, используйте

if __name__ == '__main__:
    start()

Иногда процессам требуется много памяти, Единственный надежный способ освободить память - завершить процесс.maxtasksperchild=1 говорит pool прекратить каждый рабочий процесс после завершения 1 задачи.Затем он порождает новый рабочий процесс для обработки другой задачи (если есть).Это освобождает ресурсы (памяти), которые могли быть выделены исходным рабочим, которые иначе не могли бы быть освобождены.

В вашей ситуации не похоже, что рабочему процессу потребуется много памяти, так что вы, вероятно, неТ maxtasksperchild=1convert_to_xml операторы process = subprocess.Popen(...) порождают подпроцесс latexml.Без блокирующего вызова, такого как process.communicate(), convert_to_xml завершается, даже когда latexml продолжает работать в фоновом режиме.

Поскольку convert_to_xml заканчивается, пул отправляет связанному рабочему процессу другую задачу для запускаи поэтому convert_to_xml вызывается снова.Еще раз latexml процесс порождается в фоновом режиме.Довольно скоро вы готовы к рассмотрению в процессах latexml и достигается ограничение на количество открытых файлов.

Исправить несложно: добавьте process.communicate(), чтобы указать convert_to_xml дождатьсяlatexml процесс завершен.

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

chunksize влияет на количество задач, выполняемых работником перед отправкой результата обратно в основной процесс. Иногда это может повлиять на производительность, особенно если межпроцессное взаимодействие является существенной частью общего времени выполнения.

В вашей ситуации convert_to_xml занимает относительно много времени (при условии, что мы ждем до latexmlзаканчивается) и просто возвращает None.Таким образом, межпроцессное взаимодействие, вероятно, не является значительной частью общего времени выполнения.Поэтому я не ожидаю, что вы обнаружите существенное изменение производительности в этом случае (хотя экспериментировать никогда не повредит!).


В простом Python map не следует использовать только длявызывать функцию несколько раз.

По аналогичной стилистической причине я бы зарезервировал использование методов pool.*map* для ситуаций, когда меня заботили возвращаемые значения.

Так что вместо

for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
    pass

Вы могли бы рассмотреть возможность использования

for preprint in preprints:
    pool.apply_async(convert_to_xml, args=(preprint, ))

.


Итерация, передаваемая любой из pool.*map* функций, равна , потребляемой немедленно .Не имеет значения, является ли итерируемый итератор.Здесь нет особого преимущества памяти при использовании итератора.imap_unordered возвращает итератор, но он не обрабатывает свой ввод каким-либо особенно дружественным к итератору способом.

Независимо от того, какой тип итерируемого вы передаете, при вызове функции pool.*map* итерируемое используется и превращается в задачи, которые помещаются в очередь задач.

Вот код, который подтверждает это утверждение:

version1.py:

import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()
    for item in pool.imap_unordered(foo, gen()):
        pass

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

version2.py:

import multiprocessing as mp
import time
def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()

    for item in gen():
        result = pool.apply_async(foo, args=(item, ))

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

Запуск version1.py и version2.py оба дают одинаковый результат.

Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here

Важно отметить, что Got here печатается очень быстро 10 раз в начале цикла, а затем перед окончанием программы наступает длинная пауза (пока выполняется расчет).

Если генератор gen() как-то медленно расходуется к pool.imap_unordered, следует ожидать, что Got here будет печататься также медленно.Поскольку Got here печатается 10 раз и быстро, мы видим, что итеративный gen() полностью расходуется задолго до того, как задачи завершены.

Запуск этих программ, надеюсь, даст вам уверенность в том, что pool.imap_unordered иpool.apply_async ставит задачи в очередь по существу таким же образом: сразу после вызова.

...