python - многопроцессорная обработка прекращается после некоторой пакетной операции - PullRequest
2 голосов
/ 26 января 2020

Я пытаюсь выполнить обработку изображений на всех ядрах, доступных на моей машине (которая имеет 4 ядра и 8 процессоров). Я выбрал многопроцессорность, потому что это своего рода рабочая нагрузка на процессор. Теперь, объясняя данные, у меня есть CSV-файл, в котором записаны пути к файлам (локальный путь), категория изображений (объясните, что это за изображение). CSV имеет ровно 9258 категорий. Моя идея состоит в том, чтобы сделать пакетную обработку. Назначьте 10 категорий каждому процессору и l oop по изображениям по одному, подождите, пока все процессоры завершат свою работу, и назначьте следующий пакет.

Категории сохраняются в этом формате as_batches = [[C1, C2, ..., C10], [C11, C12, C13, ..., C20], [Cn-10, Cn-9,..., Cn]]

Вот функция, которая запускает процесс.

def  get_n_process(as_batches, processes, df, q):
    p = []
    for i in range(processes):
        work = Process(target=submit_job, args=(df, as_batches[i], q, i))
        p.append(work)
        work.start()
    as_batches = as_batches[processes:]
    return p, as_batches

Вот основная l oop,

while(len(as_batches) > 0):
        t = []

        #dynamically check the lists
        if len(as_batches) > 8:
            n_process = 8
        else:
            n_process = len(as_batches)

        print("For this it Requries {} Process".format(n_process))

        process_obj_inlist, as_batches = get_n_process(as_batches, n_process, df, q)

        for ind_process in process_obj_inlist:
            ind_process.join()

        with open("logs.txt", "a") as f:
            f.write("\n")
            f.write("Log Recording at: {timestamp},  Remaining N = {remaining} yet to be processed".format(
                timestamp=datetime.datetime.now(),
                remaining = len(as_batches)
            ))
            f.close()

В целях ведения журнала я записываю в текстовый файл, чтобы увидеть, сколько категорий еще предстоит обработать. А вот и основная функция

def do_something(fromprocess):
    time.sleep(1)
    print("Operation Ended for Process:{}, process Id:{}".format(
        current_process().name, os.getpid()
    ))

    return "msg"

def submit_job(df, list_items, q, fromprocess):
    a = []
    for items in list_items:
        oneitemdf = df[df['MinorCategory']==items]['FilePath'].values.tolist()
        oneitemdf = [x for x in oneitemdf if x.endswith('.png')]
        result = do_something(fromprocess)
        a.append(result)
    q.put(a) 

Пока я просто печатаю в консоли, но в реальном коде я буду использовать алгоритм KAZE для извлечения объектов из изображений, сохранения их в списке и добавить его в очередь (общая память) со всех процессоров. Теперь сценарий выполняется в течение нескольких минут, но через некоторое время сценарий останавливается. Это не бежало дальше . Я пытался выйти, но не смог. Я думаю, что может случиться какой-то тупик, но я не уверен. Я читаю онлайн-источники, но не могу найти решение и причину, почему это происходит?

Полный код приведен в следующей ссылке: Ссылка на полный исходный код . Что я здесь не так делаю? Я новичок в многопроцессорности и многопоточности. Я хотел бы понять концепцию в глубине. Ссылки / ресурсы, связанные с этим topi c, высоко ценятся.

UPDATE - Тот же код отлично работает на ОС Ma c.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...