Многопроцессорная очередь файлов в папках - PullRequest
0 голосов
/ 24 сентября 2019

Вопрос в том, как правильно обрабатывать файлы в многопроцессорной обработке Python 3.7 в случае, когда я рекурсивно сканирую каталоги.

Мой код выглядит следующим образом:

def f(directoryout,directoryoutfailed,datafile,filelist_failed,imagefile,rootpath,extension,debug):

[...]некоторая обработка

if __name__ == '__main__':
import csv
import os
debug = 0
timeout = 20

if debug == 0:
    folder              = '/home/debian/Desktop/environments/dedpul/files/fp/'
    datafile            = 'fpdata.csv' # file with results
    directoryout        = 'fp_out' # out directory for debugging
    directoryoutfailed  = 'fp_out_failed' # out directory for wrongly processed for debuggin mode
    filelist            = 'filelist.csv' # list of processed files
    filelist_failed     = 'filelist_failed.csv' # list of wrongly processed files

counter = 0

pool = Pool(processes=4)
for root, subFolders, files in os.walk(folder):
    for imagefile in files:
        rootpath = root+'/'
        fullpath = root+'/'+imagefile
        extension = os.path.splitext(imagefile)[1]
        imagefilesplit = imagefile.split('.')[0]
        counter += 1

        print('\033[93m ## ',counter,' ## \033[0m',rootpath)

        fileexist = 0
        with open(filelist) as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=',')
            for row in csv_reader:
                if row[0] == fullpath:
                    fileexist = 1
        if fileexist == 1:
            print('    File was processed, skipping...')
            continue

        with open(filelist, mode='a') as csv_file:
            writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writerow([fullpath])

        # print(directoryout,directoryoutfailed,datafile,filelist_failed,imagefile,rootpath,extension,debug)
        res = pool.apply(f, (directoryout,directoryoutfailed,datafile,filelist_failed,imagefile,rootpath,extension,debug)) 
pool.close()
pool.join()

1-й, когда я использую pool.apply_async, он использует все ядра, однако он не обрабатывает функцию f () правильно.С pool.apply () он работает в однопоточном режиме.

2-й, как вы видите, я рекурсивно сканирую список файлов в папках в цикле.Если файл был найден обработанным, этот цикл должен продолжаться.Должен ли я сделать это в функции __ main __, или она должна быть перемещена в функцию f () ?Если да, как обмениваться тем, что происходит во время обработки, которая занимает несколько секунд на файл?

3-й, функция f () независима, поэтому, если она будет обрабатывать файл изображения, а затемдобавит результаты в файл fpdata.csv (или добавит имя не очень хорошо обработанного файла в filelist_failed.csv ) и просто закроет обработку без каких-либо проблем, поэтому никакого реального вывода не требуется.Мне нужно просто запустить эту функцию в многопроцессорном режиме.

Что я делаю не так?Должен ли я использовать оператор

with Pool(processes=4) as pool:

?Прежде чем задавать этот запрос, я просмотрел тонны ответов, но, по-видимому, было очень сложно найти такую ​​обработку файлов, в том числе и в руководстве по Python.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...