Python многопроцессорная зависает при соединении - PullRequest
0 голосов
/ 29 ноября 2018

Я читаю видеофайл, так что каждые 20 кадров я сохраняю первые кадры во входной очереди.Как только я получаю все необходимые кадры во входной очереди, я запускаю несколько процессов, чтобы выполнить некоторую операцию над этими кадрами и сохранить результаты в выходной очереди.Но код всегда зависал при соединении, я пробовал разные решения, предложенные для таких проблем, но ни одно из них не работает.

import numpy as np
import cv2
import timeit
import face_recognition
from multiprocessing import Process, Queue, Pool
import multiprocessing
import os

s = timeit.default_timer()

def alternative_process_target_func(input_queue, output_queue):

    while not output_queue.full():
        frame_no, small_frame, face_loc = input_queue.get()
        print('Frame_no: ', frame_no, 'Process ID: ', os.getpid(), '----', multiprocessing.current_process())
        #canny_frame(frame_no, small_frame, face_loc)

        #I am just storing frame no for now but will perform something else later
        output_queue.put((frame_no, frame_no)) 

        if output_queue.full():
            print('Its Full ---------------------------------------------------------------------------------------')
        else:
            print('Not Full')

    print(timeit.default_timer() - s, ' seconds.')
    print('I m not reading anymore. . .', os.getpid())


def alternative_process(file_name):
    start = timeit.default_timer()
    cap = cv2.VideoCapture(file_name)
    frame_no = 1
    fps = cap.get(cv2.CAP_PROP_FPS)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print('Frames Per Second: ', fps)
    print('Total Number of frames: ', length)
    print('Duration of file: ', int(length / fps))
    processed_frames = 1
    not_processed = 1
    frames = []
    process_this_frame = True
    frame_no = 1
    Input_Queue = Queue()
    while (cap.isOpened()):
        ret, frame = cap.read()
        if not ret:
            print('Size of input Queue: ', Input_Queue.qsize())
            print('Total no of frames read: ', frame_no)
            end1 = timeit.default_timer()
            print('Time taken to fetch useful frames: ', end1 - start)
            threadn = cv2.getNumberOfCPUs()
            Output_Queue = Queue(maxsize=Input_Queue.qsize())
            process_list = []
            #quit = multiprocessing.Event()
            #foundit = multiprocessing.Event()

            for x in range((threadn - 1)):
                # print('Process No : ', x)
                p = Process(target=alternative_process_target_func, args=(Input_Queue, Output_Queue))#, quit, foundit
                #p.daemon = True
                p.start()
                process_list.append(p)
                #p.join()

            # for proc in process_list:
            #     print('---------------------------------------------------------------', proc.p)

            i = 1
            for proc in process_list:
                print('I am hanged here')
                proc.join()
                print('I am done')
                i += 1

            end = timeit.default_timer()
            print('Time taken by face verification: ', end - start)

            break

        if process_this_frame:
            print(frame_no)
            small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
            rgb_small_frame = small_frame[:, :, ::-1]
            face_locations = face_recognition.face_locations(rgb_small_frame)
            # frames.append((rgb_small_frame, face_locations))
            Input_Queue.put((frame_no, rgb_small_frame, face_locations))
            frame_no += 1

        if processed_frames < 5:
            processed_frames += 1
            not_processed = 1

        else:
            if not_processed < 15:
                process_this_frame = False
                not_processed += 1
            else:

                processed_frames = 1
                process_this_frame = True
                print('-----------------------------------------------------------------------------------------------')

    cap.release()
    cv2.destroyAllWindows()

alternative_process('user_verification_2.avi')

1 Ответ

0 голосов
/ 29 ноября 2018

Как сказано в документации по Process.join, зависание (или «блокировка» ) - это именно то, что ожидается:

Блокируйте вызывающий поток до тех пор, пока не завершится процесс, для которого вызывается метод join(), или пока не истечет необязательное время ожидания.

join останавливает текущий поток до завершения целевого процесса.Целевой процесс вызывает alternative_process_target_func, поэтому проблема, очевидно, заключается в этой функции.Это никогда не заканчивается.Причин может быть несколько.

Задача 1

alternative_process_target_func работает до output_queue.full().Что делать, если оно никогда не бывает полным?Это никогда не заканчивается?Действительно, лучше определить конец другим способом, например, запустить до тех пор, пока входная очередь не станет пустой.

Задача 2

input_queue.get() заблокируется, если входная очередь пуста.Как указано в документации ,

Удалить и вернуть элемент из очереди.Если необязательный блок args имеет значение true, а время ожидания равно None (по умолчанию), блокируйте его, если необходимо, до тех пор, пока элемент не станет доступен.

Вы запускаете несколько процессов, поэтому не ожидайте, что во вводе что-то есть, простопотому что output_queue.full() был Ложь минуту назад, и потому что размер ввода совпадает с размером вывода.Многое могло произойти за это время.

То, что вы хотите сделать:

try:
    input_queue.get(false)  # or input_queue.get_nowait()
except Empty:
    break  # stop when there is nothing more to read from the input

Задача 3

output_queue.put((frame_no, frame_no)) заблокируется, если в нем нет меставывод для хранения данных.

Опять же, вы предполагаете, что в выводе есть место только потому, что вы проверили output_queue.full() несколько минут назад, и потому что размер ввода равен размеру вывода.Никогда не полагайтесь на такие вещи.

Вы хотите сделать то же самое, что и для ввода:

try:
    output_queue.put((frame_no, frame_no), false)
    # or output_queue.put_nowait((frame_no, frame_no))
except Empty:
    # deal with this somehow, e.g.
    raise Exception("There is no room in the output queue to write to.")
...