Я начинаю со списка файлов и текста, содержащего команды.
Теперь я получил рабочую функцию, которая принимает пакет файлов в виде списка в качестве первого аргумента, а dict - в качестве второго аргумента и возвращает новый dict.
Я действительно не знаю, как заставить это теперь работать в многопроцессорной обработке таким образом, чтобы рабочая функция выполнялась x раз параллельно, где x - это количество процессоров ПК, и like запускает новые процессы, как только один из них завершается, если список файлов уже не "пуст".
А также для командного запроса нужна запись с номером партии, или я должен просто передать его как третий аргумент?
Было бы без проблем получить все вместе (в списке?) Dict в самом конце, хотя было бы неплохо иметь возможность что-то с ним делать, когда процесс завершится.
Это для Linux с Python 3.5. Я пытался использовать Process Class, но мне не удалось заставить его работать (я все еще плохо знаком с программированием, поэтому могу не увидеть тривиальное решение: D), и для класса пула я не знаю, как передать диктат с изменяющимся номером партии.
fileslist[1,2,3,4,5,6,7,8,9]
batchsize = 3
commanddict = {"BatchNumberStr": {"job{:05d}/".format(batchnumber): str}, {"command1": {value, type(value)}, etc}
(рабочий создает папку и т. Д. С этим номером партии)
Я сохраняю тип значения для проверок безопасности позже в дополнительной части, я знаю, что это, вероятно, не лучший способ, но теперь он работает для меня: D
теперь я хочу иметь столько рабочих, сколько процессоров, что-то вроде этого
numjobs = len(fileslist) / batchsize
if int(numjobs) != numjobs:
numjobs = int(numjobs) +1
else:
numjobs = int(numjobs)
for i in range(numjobs):
use = []
for i in range(batchsize):
try:
use.append(fileslist.pop(0))
except IndexError:
break
commanddict.update({"BatchNumberStr": {"job{:05d}/".format(i): str}}
process = worker(deepcopy(use), deepcopy(commanddict))
process.start()
result = process.get()
process.join()
это всего лишь царапина
как мне ограничить процесс и присоединиться к новостям, если они закончены? если бы кто-нибудь мог помочь мне в этом, если бы с пулом было лучше, я был бы очень признателен:)
Обновление:
мое решение теперь будет:
def poolfeeder(allfileslist, workdict):
a = 0
numfiles = (list(x for x in workdict["NumberofFilesPerBatch"]))[0]
status = True
while status is True:
filestouse = []
for x in range(numfiles):
try:
filestouse.append(allfileslist.pop(0))
except IndexError:
status = False
break
a = a + 1
workdict.update({"BatchNumberStr": {"job{:05d}/".format(a): str}})
if len(filestouse) > 0:
yield [deepcopy(filestouse), deepcopy(workdict)]
else:
return
def worker(listinput):
filelist = listinput[0]
commanddict = listinput[1]
do stuff
return result
poolfeed = poolfeeder(allfileslist, dict)
with mp.Pool(processes=numcpus) as p:
reslist = [p.apply_async(worker, (r,)) for r in poolfeed]
for result in reslist:
print(result.get())
Это хороший способ?
Если один рабочий процесс сталкивается с RunTimeError, он сохраняется как результат или прерывает все процессы, если нет, как я могу вызвать SystemExit, если один из рабочих сталкивается с RunTimeError?