Как реализовать многопроцессорный пул Python для конвертации тысяч файлов - PullRequest
0 голосов
/ 19 февраля 2020

У меня есть сценарий Python, который читает в каталоге более 10000 файлов DBF, чтобы их можно было преобразовать в CSV. Я хотел бы parellelize эту задачу, а не преобразовывать каждый файл в отдельности. Я прочитал о многопроцессорном модуле Python, хотя у меня возникли небольшие проблемы с его реализацией для этой задачи. В частности, я хотел использовать класс Pool для распределения рабочей нагрузки между ядрами ЦП.

Вот мой код на данный момент:

import os
from dbfread import DBF
import pandas as pd
import multiprocessing

directory = 'C:\\Path_to_DBF_Files' #define file directory 

files_in = os.listdir(directory) #store files in directory to list 

def convert():

    for file in files_in:

        if file.startswith('D') and file.endswith('.DBF'): #define parameters of filenames to convert
            file_path = os.path.join(files_in, file)
            print(f'\nReading in {file}...')
            dbf = DBF(file_path) #create DBF object 
            dbf.encoding = 'utf-8' #set encoding attribute to utf-8 instead of acsii 
            dbf.char_decode_errors = 'ignore' #set decoding errors attribute to ignore any errors and read in DBF file as is 
            print('\nConverting to DataFrame...')
            df = pd.DataFrame(iter(dbf)) #convert to Pandas dataframe 
            df.columns.astype(str) #convert column datatypes to string
            print(df)
            print('\nWriting to CSV...')
            dest_directory = 'C:\\Path_to_output_directory\\%s.csv' % ('D' + file.strip('.DBF')) #define destination directory and names for output files 
            df.to_csv(dest_directory, index = False)
            print(f'\nConverted {file} to CSV. Moving to next file...')


        elif file.startswith('B') and file.endswith('.DBF'): #define parameters for unnecessary files 
            print('\nB file not needed.')
            continue

        elif file.endswith('.FPT'): #skip FPT files 
            print('Skipping FPT file.')
            continue

        elif file.startswith('ID') and file.endswith('.DB~'): #stop iteration when this file is reached in the directory 
            print('All files converted to CSV.')
            break

        else:
            print('\nFile not found or error.')
            print(f'Last file read in was {file}.')

pool = multiprocessing.Pool(processes = len(in_files)) #create Pool to run across the length of the input directory
result = pool.map(convert, files_in) #reference convert function and list of DBF files to be passed through
print(result) 

Я прочитал несколько ответов здесь на StackOverflow это несколько похоже на мой вопрос; однако я не видел ничего, что относится к моей конкретной задаче c. Как я могу улучшить свой код, чтобы вместо чтения и преобразования только одного файла за раз скрипт обрабатывал несколько файлов одновременно?

Спасибо за предоставленную помощь.

Ответы [ 2 ]

2 голосов
/ 19 февраля 2020

Некоторые общие рекомендации:

  1. Вы создаете пул. Размер пула должен зависеть от машины, а не от размера вашей работы. Например, вам нужно 4 процесса в пуле вместо 10000 процессов, даже если у вас есть 10000 файлов для обработки

  2. Задание, которое нужно запустить для каждого процесса, должно быть простым, но параметризованным. В вашем случае создайте функцию, которая будет принимать имя файла в качестве входных данных и выполнять преобразование. Затем сопоставьте входные файлы в него. Фильтрация должна быть выполнена до вызова map.

Поэтому я бы преобразовал ваш код во что-то вроде следующего:

import os
from dbfread import DBF
import pandas as pd
import multiprocessing

directory = 'C:\\Path_to_DBF_Files' #define file directory 

files_in = os.listdir(directory) #store files in directory to list 

def convert(file):
    file_path = os.path.join(files_in, file)
    print(f'\nReading in {file}...')
    dbf = DBF(file_path) #create DBF object 
    dbf.encoding = 'utf-8' #set encoding attribute to utf-8 instead of acsii 
    dbf.char_decode_errors = 'ignore' #set decoding errors attribute to ignore any errors and read in DBF file as is 
    print('\nConverting to DataFrame...')
    df = pd.DataFrame(iter(dbf)) #convert to Pandas dataframe 
    df.columns.astype(str) #convert column datatypes to string
    print(df)
    print('\nWriting to CSV...')
    dest_directory = 'C:\\Path_to_output_directory\\%s.csv' % ('D' + file.strip('.DBF')) #define destination directory and names for output files 
    df.to_csv(dest_directory, index = False)
    print(f'\nConverted {file} to CSV. Moving to next file...')

pool = multiprocessing.Pool(processes = 4)
pool.map(convert, [file for file in files_in if file.startswith('D') and file.endswith('.DBF')])
0 голосов
/ 19 февраля 2020

Я бы посоветовал взглянуть на ThreadPoolExecutor или ProcessPoolExecutor: https://docs.python.org/3/library/concurrent.futures.html

Вы можете создать пул с произвольным количеством работников, представить работу собирая фьючерсы, а затем ожидая их завершения, пока они не будут завершены.

Выполните рефакторинг вашей функции преобразования для обработки одного файла (то есть извлеките for-l oop из функции), затем l oop над элементами при отправке работы.

Пример кода

from concurrent.futures import ProcessPoolExecutor


executor = ProcessPoolExecutor(max_workers=10)

futures = []
for f in files:
    futures.append(executor.submit(convert, f)

for future in futures:
    future.result()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...