Я хотел бы использовать многопроцессорность для анализа взаимосвязей между кадрами в видео. Здесь, например, я вычисляю среднеквадратичную ошибку между соседними кадрами. Код сначала временно сохраняет загруженные кадры в очередь, а затем выполняет многопроцессорную обработку кадров.
Код работает достаточно эффективно, но мне было интересно, если вы, ребята, видите более аккуратный или лучший способ сделать это это?
Хитрость в том, что вы не можете просто разделить видео на n частей на n ядер, а затем выполнить многопроцессорную обработку кадров на каждом ядре отдельно, потому что меня интересует взаимосвязь между кадрами , Если просто разделить видео, последние кадры ядра 1 не будут сравниваться с первыми кадрами ядра 2 ... Кроме того, если я хочу сравнить каждый m-й кадр, это сложнее.
Вот мой код: Вы можете проверить его, и я надеюсь, что комментариев будет достаточно, чтобы провести вас через него.
Это может быть возможным выводом среднеквадратичной ошибки между соседними кадрами: ![enter image description here](https://i.stack.imgur.com/TUJg1.png)
#!/usr/bin/env python
import cv2
from multiprocessing import Process,Queue
from skimage.metrics import mean_squared_error
import matplotlib.pyplot as plt
CHECK_SCORE_LIMIT=200
class VideoAnalysis:
def __init__(self,video_name,queue_size,cores):
# save video file name
self.vid=video_name
# initialize the fixed sized queue of frames
self.queue=Queue(queue_size)
# max number of simultaneous processes
self.cores=cores
# list of active processes
self.processes=[]
# queue of structural similarity scores
self.scores=Queue()
# a single variable to know when video frame reading is over.( saved as a queue inorder to be shared between processes)
self.finish=Queue(1)
# list of structral similarity scores. This will be used to unpack the scores queue whenever its size reaches CHECK_SCORE_LIMIT.
self.scoreslist=[]
def add_frames(self):
"""This function is responisble for adding video frames to the frame queue"""
#open the video
video=cv2.VideoCapture(self.vid)
#get the video duration in seconds, just for displaying a progress bar. duration=(total nb of frames)/fps
duration=video.get(cv2.CAP_PROP_FRAME_COUNT)/video.get(cv2.CAP_PROP_FPS)
print("Processing...")
#iterate indefinetly until return is called when video is all read
while True:
# read a frame, with bool to know if the video is still not over
not_over,frame=video.read()
# calculations for displaying the progress bar. perc is the percentage of progress, = elapsed time (CAP_PROP_POS_MSEC)/duration.
# hasht is the number of # signs to be printed. scale taken to be 50 signs for a full progress.
perc=0.1*video.get(cv2.CAP_PROP_POS_MSEC)/duration
hasht=int(perc/2)
if not_over:
# if video is not over, i.e. a non empty frame is read, print the progress bar and put the frame in the queue.
print("\r{:.2f}% [{}]".format(perc,"#"*(hasht)+" "*(50-hasht)),end='')
self.queue.put(frame)
else:
# case video is over, print the last progress bar instance
print("\r{:.2f}% [{}]".format(100,"#"*(50)))
# notify the parent process that this process is over, by appending a bool to self.finish.
self.finish.put(True)
# close video file then return
video.release()
print("Shutting down all processes...")
return
def compare(self,frame1,frame2):
# function to compare two frames and put the result in the scores queue
self.scores.put(mean_squared_error(frame1,frame2))
def analyze(self):
#calling this method will starts teh video processing
# start the function of adding frames as a unique process
p=Process(target=self.add_frames)
p.start()
# get a frame from the frame queue
prev_frame=self.queue.get()
# iterate indefinetly until a break is called when all required comparison processes have been started.
while True:
# check if the score limit is reached in the score queue.
#if yes, unpack them to the score list.
#(this to avoid crashing when queue size becomes around 500. better to use a limit of 300-400 bcs Queue.qsize() is not accurate.
if self.scores.qsize()>CHECK_SCORE_LIMIT:
for _ in range(CHECK_SCORE_LIMIT):
self.scoreslist.append(self.scores.get())
# if max number of simultaneuous active processes is reached,
# wait for the first one to finish, then pop it out from the list of active processes.
if len(self.processes)==self.cores:
self.processes[0].join()
self.processes.pop(0)
new_frame=None
# now the following procedure is for checking two scenarios of concern:
# 1- finished reading frames from the video, but still not all comparison processes are issued.
# In this case, self.finish is not empty but the pending frames queue(self.queue) is also not empty.
# 2- finished reading and finished calling comparisons.
# In this case, self.finish is not empty and self.queue (the queue of pending frames) is empty.
# a variable to check if self.finish is empty
a=None
# a variable to know if a new frame has been taken from the frame.queue.
taken=False
try:
# if a is empty, an exception should be issued and thus the code will enter the except block.
# the exception is due to the version of the get function (nowait)
a=self.finish.get_nowait()
except:
# do nothing, we are sure now the adding frames is not over.
pass
else:
# if no exception is issued, adding frames is over
# put the value again; to ensure the same procedure in the next iteration(if the loop didn't break)
self.finish.put(a)
# now to test in which scenario are we in
try:
# get a new frame from the frame queue. if an exception is raised, the queue is empty and we are in scenario 2.
# if no exception raised, set the "taken" value to true, to prevent reading a frame another time in the same iteration.
new_frame=self.queue.get_nowait()
gotten=True
except:
# break after waiting for the adding frame process to finish.(it is finished yes, but it is better to call join() to all processes)
p.join()
break
# if the frame is already read in a block above, skip reading
if not taken:
new_frame=self.queue.get()
# initialize a process to compare two frames and add the process to active processes list
pp=Process(target=self.compare,args=(prev_frame,new_frame))
self.processes.append(pp)
pp.start()
# save the new frame to compare it with the next one
prev_frame=new_frame
# finalizing the work. waiting for any active process to finish.
for ppp in self.processes:
ppp.join()
# a portion of the scores may be still in the queue score, be sure to get all scores in the scorelist.
while True:
try:
self.scoreslist.append(self.scores.get_nowait())
except:
# will break when exception is called when the queue score becomes empty.
break
def print_scores(self):
count=1
for i in self.scoreslist:
#print(count,i) in case you want to see the values in the console.
count+=1
plt.plot(list(range(count-1)),self.scoreslist)
plt.show()
if __name__=='__main__':
vid_name='1.mp4'
analysis=VideoAnalysis(vid_name,40,8)
analysis.analyze()
analysis.print_scores()