Как реализовать конвейеризацию в Python? - PullRequest
1 голос
/ 25 июня 2019

У меня есть программа, которая обрабатывает видео в реальном времени с некоторыми маркерами.

Он делится на:

  1. Импорт следующего изображения видео
  2. Преобразование изображения в читаемую форму
  3. Обнаружение маркеров
  4. Отслеживание маркеров
  5. Пользовательский интерфейс Draw

Это хорошо работает на моем компьютере, но оно также должно работать и на Raspberry Pi, поэтому использование всего одного ядра все время не приведет к его сокращению.

Вот почему я хочу представить конвейерную обработку. На моем курсе по компьютерной архитектуре в университете я узнал об аппаратной конвейерной обработке, поэтому мне было интересно, можно ли реализовать что-то подобное в python:

Так что вместо того, чтобы делать Импорт -> Конверсия -> Обработка -> Отслеживание -> Рисование -> ...

Я хочу сделать это так:

-1----2----3----4-----5----...
Imp--Imp--Imp--Imp---Imp---...
-----Conv-Conv-Conv--Conv--...
----------Pro--Pro---Pro---...
---------------Track-Track-...
---------------------Draw--...

Так что каждый «тактовый цикл» изображения готов, а не только каждый 5-й.

Так что я подумывал об использовании для этого многопроцессорной библиотеки python, но у меня нет опыта работы с ней, но есть несколько простых тестовых программ, поэтому я не уверен, что лучше всего подойдет для этого варианта использования, например, Queue, Pool, Manager, ...

РЕШИТЬ:

Это можно сделать с помощью mpipe классного набора инструментов для Python. [http://vmlaker.github.io/mpipe/][1]

while True:
    stage1 = mpipe.OrderedStage(conversion, 3)
    stage2 = mpipe.OrderedStage(processing, 3)
    stage3 = mpipe.OrderedStage(tracking, 3)
    stage4 = mpipe.OrderedStage(draw_squares, 3)
    stage5 = mpipe.OrderedStage(ui, 3)

    pipe = mpipe.Pipeline(stage1.link(stage2.link(stage3.link(stage4.link(stage5)))))

    images = []
    while len(images) < 3:
        ret = False
        while not ret:
            ret, image = cap.read()
        images.append(image)

    for i in images:
        t = (i, frame_counter, multi_tracker)
        pipe.put(t)

    pipe.put(None)

    for result in pipe.results():
        image, multi_tracker, frame_counter = result
        Show.show_win("video", image)

Как подсказал @r_e, в начале я прочитал несколько изображений и заполнил ими конвейер. Теперь на каждом этапе вычислений запускается несколько рабочих процессов, так что каждый может работать с отдельным изображением.

Поскольку некоторую дополнительную информацию необходимо передать помимо изображения, я просто возвращаю изображение и дополнительную информацию и снова распаковываю ее на следующем этапе.

На данный момент мне пришлось отключить отслеживание, чтобы я не смог сравнить его со старой версией. Это немного медленнее (отслеживание будет ухудшать скорость, поскольку мне не нужно обнаруживать объекты в каждом кадре, а только в каждом 30-м). Но я дам вам обновление, если я заставлю его работать.

Ответы [ 2 ]

0 голосов
/ 26 июня 2019

У меня была небольшая попытка сделать это.Он основан на вашей диаграмме и использует 5-этапный конвейер и многопроцессорную обработку.Начните чтение ближе к концу в:

def main():
    ...
    ...

#!/usr/bin/env python3

import logging
import numpy as np
from time import sleep
from multiprocessing import Process, Queue

class Stage1(Process):
    """Acquire frames as fast as possible and send to next stage"""
    def __init__(self, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage1 %(message)s',
                        filename='log-stage1.txt', filemode='w')
        logging.info('started')

        # Generate frames and send down pipeline
        for f in range(NFRAMES):
            logging.debug('Generating frame %d',f)
            # Generate frame of random stuff
            frame = np.random.randint(0,256,(480,640,3), dtype=np.uint8)
            logging.debug('Forwarding frame %d',f)
            self.oqueue.put(frame)

class Stage2(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage2 %(message)s',
                        filename='log-stage2.txt', filemode='w')
        logging.info('started')

        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...

            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)

class Stage3(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage3 %(message)s',
                        filename='log-stage3.txt', filemode='w')
        logging.info('started')
        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...

            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)

class Stage4(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage4 %(message)s',
                        filename='log-stage4.txt', filemode='w')
        logging.info('started')

        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...

            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)

class Stage5(Process):
    """Read frames from previous stage as fast as possible, and display"""
    def __init__(self, iqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage5 %(message)s',
                        filename='log-stage5.txt', filemode='w')
        logging.info('started')

        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Displaying frame %d', f)
            # Display frame ...

def main():
    # Create Queues to send data between pipeline stages
    q1_2 = Queue(5)    # queue between stages 1 and 2
    q2_3 = Queue(5)    # queue between stages 2 and 3
    q3_4 = Queue(5)    # queue between stages 3 and 4
    q4_5 = Queue(5)    # queue between stages 4 and 5

    # Create Processes for stages of pipeline
    stages = []
    stages.append(Stage1(q1_2))
    stages.append(Stage2(q1_2,q2_3))
    stages.append(Stage3(q2_3,q3_4))
    stages.append(Stage4(q3_4,q4_5))
    stages.append(Stage5(q4_5))

    # Start the stages
    for stage in stages:
        stage.start()

    # Wait for stages to finish
    for stage in stages:
        stage.join()

if __name__ == "__main__":
    NFRAMES = 1000
    main()

В данный момент он просто генерирует кадр случайного шума и передает его по конвейеру.Он записывает каждый процесс в отдельный файл, который он перезаписывает для каждого нового запуска программы из-за filemode='w'.Вы можете увидеть отдельные журналы, как это:

-rw-r--r--  1 mark  staff  1097820 26 Jun 17:07 log-stage1.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage2.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage3.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage4.txt
-rw-r--r--  1 mark  staff   548930 26 Jun 17:07 log-stage5.txt

Затем вы можете увидеть, сколько раз каждый процесс получил и отправил каждый кадр:

more log-stage1.txt

1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
1561565618.648173 [DEBUG] Stage1 Generating frame 2
1561565618.687316 [DEBUG] Stage1 Forwarding frame 2

Или проследить, чтобы «кадр 1» проходил черезЭтапы:

pi@pi3:~ $ grep "frame 1$" log*

log-stage1.txt:1561565618.625659 [DEBUG] Stage1 Generating frame 1
log-stage1.txt:1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
log-stage2.txt:1561565618.671272 [DEBUG] Stage2 Received frame 1
log-stage2.txt:1561565618.672272 [DEBUG] Stage2 Forwarding frame 1
log-stage3.txt:1561565618.713618 [DEBUG] Stage3 Received frame 1
log-stage3.txt:1561565618.715468 [DEBUG] Stage3 Forwarding frame 1
log-stage4.txt:1561565618.746488 [DEBUG] Stage4 Received frame 1
log-stage4.txt:1561565618.747617 [DEBUG] Stage4 Forwarding frame 1
log-stage5.txt:1561565618.790802 [DEBUG] Stage5 Displaying frame 1

Или объединить все журналы вместе в порядке времени:

sort -g log*

1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.607765 [INFO] Stage2 started
1561565618.612311 [INFO] Stage3 started
1561565618.618425 [INFO] Stage4 started
1561565618.618785 [INFO] Stage5 started
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.640585 [DEBUG] Stage2 Received frame 0
1561565618.642438 [DEBUG] Stage2 Forwarding frame 0
0 голосов
/ 25 июня 2019

Поскольку у меня нет 50 репутаций, я не могу это комментировать. Я также не имел опыта в этом, но небольшой поиск привел меня на следующий веб-сайт, где он говорит о реальном времени и обработке видео с использованием библиотеки Multiprocessing. Надеюсь, это поможет.

1) Чтение кадров; поместите их в очередь ввода с соответствующими номерами кадров для каждого:

  # Check input queue is not full
  if not input_q.full():
     # Read frame and store in input queue
     ret, frame = vs.read()
      if ret:            
        input_q.put((int(vs.get(cv2.CAP_PROP_POS_FRAMES)),frame))

2) Возьмите кадры из входной очереди и поместите на выход с соответствующими номерами кадров:

while True:
  frame = input_q.get()
frame_rgb = cv2.cvtColor(frame[1], cv2.COLOR_BGR2RGB)
  output_q.put((frame[0], detect_objects(frame_rgb, sess, detection_graph)))

3) Восстановить обработанный кадр в очереди вывода и очереди приоритета подачи, если очередь вывода не пуста

# Check output queue is not empty
if not output_q.empty():
  # Recover treated frame in output queue and feed priority queue
  output_pq.put(output_q.get())

4) Нарисуйте кадры, пока очередь вывода не станет пустой

# Check output priority queue is not empty
  if not output_pq.empty():
    prior, output_frame = output_pq.get()
    if prior > countWriteFrame:
      output_pq.put((prior, output_frame))
    else: 
      countWriteFrame = countWriteFrame + 1    
      # Draw something with your frame

5) Наконец, для остановки проверьте, пуста ли входная очередь. Если да, перерыв.

if((not ret) & input_q.empty() & 
    output_q.empty() & output_pq.empty()):
  break

Ссылку можно найти ЗДЕСЬ

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...