Быстрая обработка видео OpenCV - PullRequest
0 голосов
/ 25 апреля 2020

Я хотел бы использовать многопроцессорность для анализа взаимосвязей между кадрами в видео. Здесь, например, я вычисляю среднеквадратичную ошибку между соседними кадрами. Код сначала временно сохраняет загруженные кадры в очередь, а затем выполняет многопроцессорную обработку кадров.

Код работает достаточно эффективно, но мне было интересно, если вы, ребята, видите более аккуратный или лучший способ сделать это это?

Хитрость в том, что вы не можете просто разделить видео на n частей на n ядер, а затем выполнить многопроцессорную обработку кадров на каждом ядре отдельно, потому что меня интересует взаимосвязь между кадрами , Если просто разделить видео, последние кадры ядра 1 не будут сравниваться с первыми кадрами ядра 2 ... Кроме того, если я хочу сравнить каждый m-й кадр, это сложнее.

Вот мой код: Вы можете проверить его, и я надеюсь, что комментариев будет достаточно, чтобы провести вас через него.

Это может быть возможным выводом среднеквадратичной ошибки между соседними кадрами: enter image description here

#!/usr/bin/env python

import cv2
from multiprocessing import Process,Queue
from skimage.metrics import mean_squared_error
import matplotlib.pyplot as plt


CHECK_SCORE_LIMIT=200

class VideoAnalysis:

    def __init__(self,video_name,queue_size,cores):

        # save video file name        
        self.vid=video_name
        # initialize the fixed sized queue of frames
        self.queue=Queue(queue_size)
        # max number of simultaneous processes 
        self.cores=cores
        # list of active processes
        self.processes=[]
        # queue of structural similarity scores
        self.scores=Queue()
        # a single variable to know when video frame reading is over.( saved as a queue inorder to be shared between processes)
        self.finish=Queue(1)
        # list of structral similarity scores. This will be used to unpack the scores queue whenever its size reaches CHECK_SCORE_LIMIT.
        self.scoreslist=[]


    def add_frames(self):
        """This function is responisble for adding video frames to the frame queue"""

        #open the video
        video=cv2.VideoCapture(self.vid)
        #get the video duration in seconds, just for displaying a progress bar. duration=(total nb of frames)/fps
        duration=video.get(cv2.CAP_PROP_FRAME_COUNT)/video.get(cv2.CAP_PROP_FPS)

        print("Processing...")

        #iterate indefinetly until return is called when video is all read
        while True:

            # read a frame, with bool to know if the video is still not over
            not_over,frame=video.read()

            # calculations for displaying the progress bar. perc is the percentage of progress, = elapsed time (CAP_PROP_POS_MSEC)/duration.
            # hasht is the number of # signs to be printed. scale taken to be 50 signs for a full progress.
            perc=0.1*video.get(cv2.CAP_PROP_POS_MSEC)/duration
            hasht=int(perc/2)


            if not_over:
                # if video is not over, i.e. a non empty frame is read, print the progress bar and put the frame in the queue.
                print("\r{:.2f}% [{}]".format(perc,"#"*(hasht)+" "*(50-hasht)),end='')
                self.queue.put(frame)

            else:
                # case video is over, print the last progress bar instance
                print("\r{:.2f}% [{}]".format(100,"#"*(50)))
                # notify the parent process that this process is over, by appending a bool to self.finish.
                self.finish.put(True)
                # close video file then return
                video.release()
                print("Shutting down all processes...")
                return


    def compare(self,frame1,frame2):
        # function to compare two frames and put the result in the scores queue
        self.scores.put(mean_squared_error(frame1,frame2))



    def analyze(self):
        #calling this method will starts teh video processing

        # start the function of adding frames as a unique process
        p=Process(target=self.add_frames)
        p.start()

        # get a frame from the frame queue
        prev_frame=self.queue.get()

        # iterate indefinetly until a break is called when all required comparison processes have been started.
        while True:

            # check if the score limit is reached in the score queue.
            #if yes, unpack them to the score list.
            #(this to avoid crashing when queue size becomes around 500. better to use a limit of 300-400 bcs Queue.qsize() is not accurate.
            if self.scores.qsize()>CHECK_SCORE_LIMIT:
                for _ in range(CHECK_SCORE_LIMIT):
                    self.scoreslist.append(self.scores.get())


            # if max number of simultaneuous active processes is reached,
            # wait for the first one to finish, then pop it out from the list of active processes.
            if len(self.processes)==self.cores:
                self.processes[0].join()
                self.processes.pop(0)


            new_frame=None

            # now the following procedure is for checking two scenarios of concern:
            # 1- finished reading frames from the video, but still not all comparison processes are issued.
            #     In this case, self.finish is not empty but the pending frames queue(self.queue) is also not empty.
            # 2- finished reading and finished calling comparisons.
            #     In this case, self.finish is not empty and self.queue (the queue of pending frames) is empty.

            # a variable to check if self.finish is empty
            a=None

            # a variable to know if a new frame has been taken from the frame.queue.
            taken=False

            try:
                # if a is empty, an exception should be issued and thus the code will enter the except block.
                # the exception is due to the version of the get function (nowait)
                a=self.finish.get_nowait()
            except:
                # do nothing, we are sure now the adding frames is not over.
                pass
            else:
                # if no exception is issued, adding frames is over
                # put the value again; to ensure the same procedure in the next iteration(if the loop didn't break)
                self.finish.put(a)

                # now to test in which scenario are we in
                try:
                    # get a new frame from the frame queue. if an exception is raised, the queue is empty and we are in scenario 2.
                    # if no exception raised, set the "taken" value to true, to prevent reading a frame another time in the same iteration.
                    new_frame=self.queue.get_nowait()
                    gotten=True
                except:
                    # break after waiting for the adding frame process to finish.(it is finished yes, but it is better to call join() to all processes)
                    p.join()
                    break


            # if the frame is already read in a block above, skip reading
            if not taken:
                new_frame=self.queue.get()

            # initialize a process to compare two frames and add the process to active processes list
            pp=Process(target=self.compare,args=(prev_frame,new_frame))                    
            self.processes.append(pp)
            pp.start()

            # save the new frame to compare it with the next one
            prev_frame=new_frame


        # finalizing the work. waiting for any active process to finish.

        for ppp in self.processes:
            ppp.join()

        # a portion of the scores may be still in the queue score, be sure to get all scores in the scorelist.
        while True:
            try:
                self.scoreslist.append(self.scores.get_nowait())
            except:
                # will break when exception is called when the queue score becomes empty.
                break


    def print_scores(self):

        count=1
        for i in self.scoreslist:
            #print(count,i) in case you want to see the values in the console.
            count+=1
        plt.plot(list(range(count-1)),self.scoreslist)
        plt.show()


if __name__=='__main__':

    vid_name='1.mp4'
    analysis=VideoAnalysis(vid_name,40,8)
    analysis.analyze()
    analysis.print_scores()
...