Как мультипроцессировать список файлов в пакетах с командами, упакованными в dict - PullRequest
0 голосов
/ 02 апреля 2019

Я начинаю со списка файлов и текста, содержащего команды.

Теперь я получил рабочую функцию, которая принимает пакет файлов в виде списка в качестве первого аргумента, а dict - в качестве второго аргумента и возвращает новый dict.

Я действительно не знаю, как заставить это теперь работать в многопроцессорной обработке таким образом, чтобы рабочая функция выполнялась x раз параллельно, где x - это количество процессоров ПК, и like запускает новые процессы, как только один из них завершается, если список файлов уже не "пуст".

А также для командного запроса нужна запись с номером партии, или я должен просто передать его как третий аргумент?

Было бы без проблем получить все вместе (в списке?) Dict в самом конце, хотя было бы неплохо иметь возможность что-то с ним делать, когда процесс завершится.

Это для Linux с Python 3.5. Я пытался использовать Process Class, но мне не удалось заставить его работать (я все еще плохо знаком с программированием, поэтому могу не увидеть тривиальное решение: D), и для класса пула я не знаю, как передать диктат с изменяющимся номером партии.

fileslist[1,2,3,4,5,6,7,8,9]
batchsize = 3
commanddict = {"BatchNumberStr": {"job{:05d}/".format(batchnumber): str}, {"command1": {value, type(value)}, etc}

(рабочий создает папку и т. Д. С этим номером партии) Я сохраняю тип значения для проверок безопасности позже в дополнительной части, я знаю, что это, вероятно, не лучший способ, но теперь он работает для меня: D

теперь я хочу иметь столько рабочих, сколько процессоров, что-то вроде этого

numjobs = len(fileslist) / batchsize
if int(numjobs) != numjobs:
    numjobs = int(numjobs) +1 
else:
    numjobs = int(numjobs)
for i in range(numjobs):
    use = []
    for i in range(batchsize):
        try:
            use.append(fileslist.pop(0))
        except IndexError:
            break
    commanddict.update({"BatchNumberStr": {"job{:05d}/".format(i): str}}
    process = worker(deepcopy(use), deepcopy(commanddict))
    process.start()
    result = process.get()

process.join()

это всего лишь царапина как мне ограничить процесс и присоединиться к новостям, если они закончены? если бы кто-нибудь мог помочь мне в этом, если бы с пулом было лучше, я был бы очень признателен:)

Обновление: мое решение теперь будет:

def poolfeeder(allfileslist, workdict):
    a = 0
    numfiles = (list(x for x in workdict["NumberofFilesPerBatch"]))[0]
    status = True
    while status is True:
        filestouse = []
        for x in range(numfiles):
            try:
                filestouse.append(allfileslist.pop(0))
            except IndexError:
                status = False
                break
        a = a + 1
        workdict.update({"BatchNumberStr": {"job{:05d}/".format(a): str}})
        if len(filestouse) > 0:
            yield [deepcopy(filestouse), deepcopy(workdict)]
        else:
            return

def worker(listinput):
    filelist = listinput[0]
    commanddict = listinput[1]
    do stuff
    return result


poolfeed = poolfeeder(allfileslist, dict)
with mp.Pool(processes=numcpus) as p:
    reslist = [p.apply_async(worker, (r,)) for r in poolfeed]
    for result in reslist:
        print(result.get())

Это хороший способ?

Если один рабочий процесс сталкивается с RunTimeError, он сохраняется как результат или прерывает все процессы, если нет, как я могу вызвать SystemExit, если один из рабочих сталкивается с RunTimeError?

...