Эффективно обрабатывать операции блокировки в Python - PullRequest
0 голосов
/ 06 февраля 2019

Я использую Python и OpenCV для получения видео из потока RTSP.Я получаю отдельные кадры из потока и сохраняю их в файловую систему.

Я написал StreamingWorker, который обрабатывает получение и сохранение кадра.Кроме того, есть StreamPool, который имеет все потоковые объекты.Я думал, что, поскольку StreamingWorker всегда будет работать, на ядро ​​должен быть только один, чтобы получить как можно больше.Тогда StreamPool предоставит VideoCapture объекты доступным StreamingWorker.

Проблема в том, что большую часть времени, когда выполняется скрипт, блокируется:

import os
import time
import threading
import cv2 as cv

class StreamingWorker(object):

    def __init__(self, stream_pool):
        self.stream_pool = stream_pool
        self.start_loop()

    def start_loop(self):
        while True:
            try:
                # getting a stream from the read_strategy
                stream_object = self.stream_pool.next()

                # getting an image from the stream
                _, frame = stream_object['stream'].read()

                # saving image to file system
                cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))

            except ValueError as e:
                print('[error] {}'.format(e))

class StreamPool(object):

    def __init__(self, streams):
        self.streams = [{'feed': stream, 'stream': cv.VideoCapture(stream)} for stream in streams]
        self.current_stream = 0
        self.lock = threading.RLock()

    def next(self):
        self.lock.acquire()
        if(self.current_stream + 1 >= len(self.streams)):
            self.current_stream = 0
        else:
            self.current_stream += 1
        result = self.streams[self.current_stream]
        self.lock.release()
        return result

def get_cores():
    # This function returns the number of available cores
    import multiprocessing
    return multiprocessing.cpu_count()


def start(stream_pool):
    StreamingWorker(stream_pool)

def divide_list(input_list, amount):
    # This function divides the whole list into list of lists
    result = [[] for _ in range(amount)]
    for i in range(len(input_list)):
        result[i % len(result)].append(input_list[i])
    return result

if __name__ == '__main__':

    stream_list = ['rtsp://some/stream1', 'rtsp://some/stream2', 'rtsp://some/stream3']

    num_cores = get_cores()
    divided_streams = divide_list(stream_list, num_cores)
    for streams in divided_streams:
        stream_pool = StreamPool(streams)
        thread = threading.Thread(target=start, args=(stream_pool))
        thread.start()

Когда я думал об этом, я не учел, что большинство операцийбудут блокировать такие операции, как:

# Getting a frame blocks
_, frame = stream_object['stream'].read()

# Writing to the file system blocks
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))

Проблема, связанная с потерей слишком большого количества времени на блокировку, заключается в том, что большая часть вычислительной мощности теряется.Я думал об использовании фьючерсов с ThreadPoolExecutor, но не могу достичь своей цели - использовать максимально возможное количество процессорных ядер.Может быть, я не устанавливаю смеяться темы.

Существует ли стандартный способ обработки операций блокировки, чтобы наилучшим образом использовать вычислительную мощность ядер?У меня все хорошо, и я не имею языкового ответа.

1 Ответ

0 голосов
/ 06 февраля 2019

В итоге я использовал ThreadPoolExecutor, используя функцию add_done_callback(fn).

class StreamingWorker(object):

    def __init__(self, stream_pool):
        self.stream_pool = stream_pool
        self.thread_pool = ThreadPoolExecutor(10)
        self.start_loop()

    def start_loop(self):
        def done(fn):
            print('[info] future done')

        def save_image(stream):
            # getting an image from the stream
            _, frame = stream['stream'].read()

            # saving image to file system
            cv.imwrite(os.path.join('result', stream['feed'], '{}.jpg'.format(time.time())))

        while True:
            try:
                # getting a stream from the read_strategy
                stream_object = self.stream_pool.next()

                # Scheduling the process to the thread pool
                self.thread_pool.submit(save_image, (stream_object)).add_done_callback(done)
            except ValueError as e:
                print('[error] {}'.format(e))

На самом деле я не хотел ничего делать после того, как будущее закончилось, но если бы я использовал result(), то while True остановился бы, что также уничтожило бы всю цель использования пула потоков.

Примечание: Мне пришлось добавить threading.Rlock() при вызове self.stream_pool.next(), потому что, очевидно, opencv не может обрабатывать вызовы из нескольких потоков.

...