Python - параллельная + пакетная обработка файлов в каталоге - PullRequest
0 голосов
/ 12 января 2019

У меня проблема, аналогичная той, которая упоминается в Python - Как параллельно использовать и работать с файлами в каталоге .

Задача : У меня есть 100k + файлов в каталоге. В моем случае process_file () принимает текстовый файл, выполняет некоторую обработку и выводит файл xml.

В отличие от вышеупомянутой темы, я хочу запустить карту пула с пакетами файлов.

Причина запуска в пакетах : Обработка каждого файла занимает в среднем минуту. Таким образом, обработка всего списка файлов займет несколько дней. Но поскольку файлы обрабатываются, я хочу начать использовать обработанные файлы для другой программы. Для этого я хочу убедиться, что у меня есть, скажем, 100 готовых файлов, а затем следующие 100 и т. Д.

Я сделал следующее:

  1. Сортировка файлов в каталоге. inputFileArr - список файлов.
  2. Запуск программы партиями:

    for i in range(int(math.ceil(len(inputFileArr) * 1.0 / batch_size))):
    
     start_index = i * batch_size
     end_index = (i + 1) * batch_size
     print("Batch #{0}: {1}".format(i, inputFileArr[start_index:end_index]))
    
     p = Pool(n_process)
     p.map(process_file, inputFileArr[start_index:end_index])
     print("Batch #{0} completed".format(i))
    

документация python для pool.map упоминает

Блокируется, пока результат не будет готов.

Я предполагал, что это означает, что только после того, как обработка файлов завершится, пакет #i будет запущен, чем пакет № (i + 1).

Но, похоже, это не так. Когда я вижу временную метку сгенерированных файлов XML, это показывает, что упорядочение пакета не поддерживается. Я вижу, что некоторые файлы пакета обрабатываются раньше, чем файлы предыдущего пакета. Чтобы быть уверенным, что я напечатал имена файлов каждой партии.

process_file ()

  1. Вызывает скрипт на python с использованием subprocess.Popen ().

    subprocess.Popen (команда)

    команда содержит что-то вроде python script.py input_args

  2. И этот скрипт Python вызывает Java-программу, используя subprocess.Popen ()

Вот код внутри скрипта Python, который вызывается моим кодом Python:

        m_process = subprocess.Popen(command, stdout=subprocess.PIPE)
        while m_process.poll() is None:
            stdout = str(m_process.stdout.readline())
            if 'ERROR' in stdout:
                m_process.terminate()
                error = stdout.rstrip()
        output = str(output_file.read())

Что я должен сделать, чтобы моя программа работала в пакетном режиме?

Среда : Python 2.7

Ответы [ 2 ]

0 голосов
/ 14 января 2019

Устранена проблема путем замены subprocess.Popen (команда) на subprocess.call (команда) .

Спасибо @Barak Itkin за помощь и указание на использование wait. Следовал решению (с использованием subprocess.call), предоставленному в Python popen command. Дождитесь окончания команды

Упомянутое решение здесь в случае, если любой другой пользователь сталкивается с подобной проблемой.

0 голосов
/ 12 января 2019

РЕДАКТИРОВАТЬ: старый ответ ниже, новый ответ вверху

Немного неэффективно ждать, пока будут выполнены первые 100 файлов, и только потом делать следующие (потому что вы могли бы начать обрабатывать следующие, пока у вас есть свободные рабочие, когда запущены последние файлы в пакете).

Тем не менее, если вы действительно хотите, чтобы обработка продолжалась до следующих 100 только после завершения первых 100, просто вызовите map для пакета из 100 файлов за раз.

files = sorted(...)
for i in range(0, len(files), 100):
    pool.map(files[i:i+100])

В зависимости от того, сколько у вас работников, я предлагаю увеличить размер партии более чем на 100, чтобы сократить время, в течение которого вы работаете без дела (как описано выше).


Предполагая, что вам просто нужны группы из 100 последовательных файлов, но не обязательно с самого начала, вы можете попробовать следующее.

По предложенной математике я бы сказал, что вы можете разделить файлы на группы по 100, а затем обработать каждую группу в отдельном работнике. последовательные файлы обрабатываются).

files = sorted(...)
file_groups = [[files[i + j] for j in range(min(100, len(files) - i))]
               for i in range(0, len(files), 100]

def process_batch(batch):
    group_index, group_files = batch
    for f in group_files:
        process_file(f)
    print('Group %d is done' % group_index)

pool.map(process_batch, enumerate(file_groups))

Предполагая, что вы просто хотите группы из 100 последовательных файлов, но не обязательно с самого начала, вы можете попробовать следующее.

В соответствии с предложенной математикой я бы сказал, что вы можете разделить файлы на группы по 100, а затем обработать каждую группу в отдельном работнике. последовательные файлы обрабатываются).

files = sorted(...)
file_groups = [[files[i + j] for j in range(min(100, len(files) - i))]
               for i in range(0, len(files), 100]

def process_batch(batch):
    group_index, group_files = batch
    for f in group_files:
        process_file(f)
    print('Group %d is done' % group_index)

pool.map(process_batch, enumerate(file_groups))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...