Я пытаюсь выполнить обработку изображений на всех ядрах, доступных на моей машине (которая имеет 4 ядра и 8 процессоров). Я выбрал многопроцессорность, потому что это своего рода рабочая нагрузка на процессор. Теперь, объясняя данные, у меня есть CSV-файл, в котором записаны пути к файлам (локальный путь), категория изображений (объясните, что это за изображение). CSV имеет ровно 9258 категорий. Моя идея состоит в том, чтобы сделать пакетную обработку. Назначьте 10 категорий каждому процессору и l oop по изображениям по одному, подождите, пока все процессоры завершат свою работу, и назначьте следующий пакет.
Категории сохраняются в этом формате as_batches = [[C1, C2, ..., C10], [C11, C12, C13, ..., C20], [Cn-10, Cn-9,..., Cn]]
Вот функция, которая запускает процесс.
def get_n_process(as_batches, processes, df, q):
p = []
for i in range(processes):
work = Process(target=submit_job, args=(df, as_batches[i], q, i))
p.append(work)
work.start()
as_batches = as_batches[processes:]
return p, as_batches
Вот основная l oop,
while(len(as_batches) > 0):
t = []
#dynamically check the lists
if len(as_batches) > 8:
n_process = 8
else:
n_process = len(as_batches)
print("For this it Requries {} Process".format(n_process))
process_obj_inlist, as_batches = get_n_process(as_batches, n_process, df, q)
for ind_process in process_obj_inlist:
ind_process.join()
with open("logs.txt", "a") as f:
f.write("\n")
f.write("Log Recording at: {timestamp}, Remaining N = {remaining} yet to be processed".format(
timestamp=datetime.datetime.now(),
remaining = len(as_batches)
))
f.close()
В целях ведения журнала я записываю в текстовый файл, чтобы увидеть, сколько категорий еще предстоит обработать. А вот и основная функция
def do_something(fromprocess):
time.sleep(1)
print("Operation Ended for Process:{}, process Id:{}".format(
current_process().name, os.getpid()
))
return "msg"
def submit_job(df, list_items, q, fromprocess):
a = []
for items in list_items:
oneitemdf = df[df['MinorCategory']==items]['FilePath'].values.tolist()
oneitemdf = [x for x in oneitemdf if x.endswith('.png')]
result = do_something(fromprocess)
a.append(result)
q.put(a)
Пока я просто печатаю в консоли, но в реальном коде я буду использовать алгоритм KAZE для извлечения объектов из изображений, сохранения их в списке и добавить его в очередь (общая память) со всех процессоров. Теперь сценарий выполняется в течение нескольких минут, но через некоторое время сценарий останавливается. Это не бежало дальше . Я пытался выйти, но не смог. Я думаю, что может случиться какой-то тупик, но я не уверен. Я читаю онлайн-источники, но не могу найти решение и причину, почему это происходит?
Полный код приведен в следующей ссылке: Ссылка на полный исходный код . Что я здесь не так делаю? Я новичок в многопроцессорности и многопоточности. Я хотел бы понять концепцию в глубине. Ссылки / ресурсы, связанные с этим topi c, высоко ценятся.
UPDATE - Тот же код отлично работает на ОС Ma c.