У меня была небольшая попытка сделать это.Он основан на вашей диаграмме и использует 5-этапный конвейер и многопроцессорную обработку.Начните чтение ближе к концу в:
def main():
...
...
#!/usr/bin/env python3
import logging
import numpy as np
from time import sleep
from multiprocessing import Process, Queue
class Stage1(Process):
"""Acquire frames as fast as possible and send to next stage"""
def __init__(self, oqueue):
super().__init__()
# Pick up parameters and store in class variables
self.oqueue = oqueue # output queue
def run(self,):
# Turn on logging
logging.basicConfig(level=logging.DEBUG,
format='%(created).6f [%(levelname)s] Stage1 %(message)s',
filename='log-stage1.txt', filemode='w')
logging.info('started')
# Generate frames and send down pipeline
for f in range(NFRAMES):
logging.debug('Generating frame %d',f)
# Generate frame of random stuff
frame = np.random.randint(0,256,(480,640,3), dtype=np.uint8)
logging.debug('Forwarding frame %d',f)
self.oqueue.put(frame)
class Stage2(Process):
"""Read frames from previous stage as fast as possible, process and send to next stage"""
def __init__(self, iqueue, oqueue):
super().__init__()
# Pick up parameters and store in class variables
self.iqueue = iqueue # input queue
self.oqueue = oqueue # output queue
def run(self,):
# Turn on logging
logging.basicConfig(level=logging.DEBUG,
format='%(created).6f [%(levelname)s] Stage2 %(message)s',
filename='log-stage2.txt', filemode='w')
logging.info('started')
for f in range(NFRAMES):
# Wait for next frame
frame = self.iqueue.get()
logging.debug('Received frame %d', f)
# Process frame ...
logging.debug('Forwarding frame %d', f)
self.oqueue.put(frame)
class Stage3(Process):
"""Read frames from previous stage as fast as possible, process and send to next stage"""
def __init__(self, iqueue, oqueue):
super().__init__()
# Pick up parameters and store in class variables
self.iqueue = iqueue # input queue
self.oqueue = oqueue # output queue
def run(self,):
# Turn on logging
logging.basicConfig(level=logging.DEBUG,
format='%(created).6f [%(levelname)s] Stage3 %(message)s',
filename='log-stage3.txt', filemode='w')
logging.info('started')
for f in range(NFRAMES):
# Wait for next frame
frame = self.iqueue.get()
logging.debug('Received frame %d', f)
# Process frame ...
logging.debug('Forwarding frame %d', f)
self.oqueue.put(frame)
class Stage4(Process):
"""Read frames from previous stage as fast as possible, process and send to next stage"""
def __init__(self, iqueue, oqueue):
super().__init__()
# Pick up parameters and store in class variables
self.iqueue = iqueue # input queue
self.oqueue = oqueue # output queue
def run(self,):
# Turn on logging
logging.basicConfig(level=logging.DEBUG,
format='%(created).6f [%(levelname)s] Stage4 %(message)s',
filename='log-stage4.txt', filemode='w')
logging.info('started')
for f in range(NFRAMES):
# Wait for next frame
frame = self.iqueue.get()
logging.debug('Received frame %d', f)
# Process frame ...
logging.debug('Forwarding frame %d', f)
self.oqueue.put(frame)
class Stage5(Process):
"""Read frames from previous stage as fast as possible, and display"""
def __init__(self, iqueue):
super().__init__()
# Pick up parameters and store in class variables
self.iqueue = iqueue # input queue
def run(self,):
# Turn on logging
logging.basicConfig(level=logging.DEBUG,
format='%(created).6f [%(levelname)s] Stage5 %(message)s',
filename='log-stage5.txt', filemode='w')
logging.info('started')
for f in range(NFRAMES):
# Wait for next frame
frame = self.iqueue.get()
logging.debug('Displaying frame %d', f)
# Display frame ...
def main():
# Create Queues to send data between pipeline stages
q1_2 = Queue(5) # queue between stages 1 and 2
q2_3 = Queue(5) # queue between stages 2 and 3
q3_4 = Queue(5) # queue between stages 3 and 4
q4_5 = Queue(5) # queue between stages 4 and 5
# Create Processes for stages of pipeline
stages = []
stages.append(Stage1(q1_2))
stages.append(Stage2(q1_2,q2_3))
stages.append(Stage3(q2_3,q3_4))
stages.append(Stage4(q3_4,q4_5))
stages.append(Stage5(q4_5))
# Start the stages
for stage in stages:
stage.start()
# Wait for stages to finish
for stage in stages:
stage.join()
if __name__ == "__main__":
NFRAMES = 1000
main()
В данный момент он просто генерирует кадр случайного шума и передает его по конвейеру.Он записывает каждый процесс в отдельный файл, который он перезаписывает для каждого нового запуска программы из-за filemode='w'
.Вы можете увидеть отдельные журналы, как это:
-rw-r--r-- 1 mark staff 1097820 26 Jun 17:07 log-stage1.txt
-rw-r--r-- 1 mark staff 1077820 26 Jun 17:07 log-stage2.txt
-rw-r--r-- 1 mark staff 1077820 26 Jun 17:07 log-stage3.txt
-rw-r--r-- 1 mark staff 1077820 26 Jun 17:07 log-stage4.txt
-rw-r--r-- 1 mark staff 548930 26 Jun 17:07 log-stage5.txt
Затем вы можете увидеть, сколько раз каждый процесс получил и отправил каждый кадр:
more log-stage1.txt
1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
1561565618.648173 [DEBUG] Stage1 Generating frame 2
1561565618.687316 [DEBUG] Stage1 Forwarding frame 2
Или проследить, чтобы «кадр 1» проходил черезЭтапы:
pi@pi3:~ $ grep "frame 1$" log*
log-stage1.txt:1561565618.625659 [DEBUG] Stage1 Generating frame 1
log-stage1.txt:1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
log-stage2.txt:1561565618.671272 [DEBUG] Stage2 Received frame 1
log-stage2.txt:1561565618.672272 [DEBUG] Stage2 Forwarding frame 1
log-stage3.txt:1561565618.713618 [DEBUG] Stage3 Received frame 1
log-stage3.txt:1561565618.715468 [DEBUG] Stage3 Forwarding frame 1
log-stage4.txt:1561565618.746488 [DEBUG] Stage4 Received frame 1
log-stage4.txt:1561565618.747617 [DEBUG] Stage4 Forwarding frame 1
log-stage5.txt:1561565618.790802 [DEBUG] Stage5 Displaying frame 1
Или объединить все журналы вместе в порядке времени:
sort -g log*
1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.607765 [INFO] Stage2 started
1561565618.612311 [INFO] Stage3 started
1561565618.618425 [INFO] Stage4 started
1561565618.618785 [INFO] Stage5 started
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.640585 [DEBUG] Stage2 Received frame 0
1561565618.642438 [DEBUG] Stage2 Forwarding frame 0