Метод получения нескольких RTSP - PullRequest
1 голос
/ 07 февраля 2020

Я пытаюсь закодировать проект, к которому у меня есть как минимум 20 rtsp URL видеонаблюдения, к которому я собираюсь получить доступ одновременно.

Я пытался использовать ffmpeg для достижения своей цели с помощью метода множественного ввода. Однако есть проблема.

ffmpeg -i URL_1 -i URL_2 -

Приведенная выше команда является примером, который я пробовал. Я sh могу получить доступ к двум rtsps через ffmpeg и вывести их в две разные очереди для дальнейшего использования. Если я использую эту команду и после этого читаю байты, я не могу различить guish, какие байты принадлежат какому входному rtsp.

Есть ли другой способ, которым я могу получить доступ к большему количеству rtsp одновременно?

Редактировать: Добавление кода

import ffmpeg
import numpy as np
import subprocess as sp
import threading
import queue
import time
class CCTVReader(threading.Thread):
    def __init__(self, q, in_stream, name):
        super().__init__()
        self.name = name
        self.q = q
        self.command = ["ffmpeg",
                        "-c:v", "h264",     # Tell ffmpeg that input stream codec is h264
                        "-i", in_stream,    # Read stream from file vid.264
                        "-c:v", "copy",     # Tell ffmpeg to copy the video stream as is (without decding and encoding)
                        "-an", "-sn",       # No audio an no subtites
                        "-f", "h264",       # Define pipe format to be h264
                        "-"]                # Output is a pipe

    def run(self):
        pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3)  # Don't use shell=True (you don't need to execute the command through the shell).

        # while True:
        for i in range(1024*10):  # Read up to 100KBytes for testing
            data = pipe.stdout.read(1024)  # Read data from pip in chunks of 1024 bytes
            self.q.put(data)

            # Break loop if less than 1024 bytes read (not going to work with CCTV, but works with input file)
            if len(data) < 1024:
                break

        try:
            pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
        except sp.TimeoutExpired:
            pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).

        if self.q.empty():
            print("There is a problem (queue is empty)!!!")
        else:
            # Write data from queue to file vid_from_queue.264 (for testingg)
            with open(self.name+".h264", "wb") as queue_save_file:
                while not self.q.empty():
                    queue_save_file.write(self.q.get())


# Build synthetic video, for testing begins:
################################################
# width, height = 1280, 720
# in_stream = "vid.264"
# sp.Popen("ffmpeg -y -f lavfi -i testsrc=size=1280x720:duration=5:rate=1 -c:v libx264 -crf 23 -pix_fmt yuv420p " + in_stream).wait()
################################################

#Use public RTSP Streaming for testing
readers = {}
queues = {}
dict = {
        "name1":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name2":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name3":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name4":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name5":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name6":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name7":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name8":{"ip":"rtsp://xxx.xxx.xxx.xxx/",
        "name9":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name10":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name11":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name12":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name13":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name14":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name15":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        }

for key in dict:
    ip = dict[key]["ip"]
    name = key
    q = queue.Queue()
    queues[name] = q
    cctv_reader = CCTVReader(q, ip, name)
    readers[name] = cctv_reader
    cctv_reader.start()
    cctv_reader.join()

1 Ответ

1 голос
/ 08 февраля 2020

У вас уже есть вся инфраструктура в вашем предыдущем вопросе .

Все, что вам нужно сделать, это создать несколько объектов вашего класса CCTVReader.

Вот пример рабочего кода с чтением двух потоков:

import numpy as np
import subprocess as sp
import threading
import queue
import time

class CCTVReader(threading.Thread):
    def __init__(self, q, in_stream, chunk_size):
        super().__init__()
        self.q = q
        self.chunk_size = chunk_size
        self.command = ["ffmpeg",
                        "-c:v", "h264",     # Tell FFmpeg that input stream codec is h264
                        "-i", in_stream,    # Read stream from file vid.264
                        "-c:v", "copy",     # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
                        "-an", "-sn",       # No audio an no subtitles
                        "-f", "h264",       # Define pipe format to be h264
                        "-"]                # Output is a pipe

    def run(self):
        pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3)  # Don't use shell=True (you don't need to execute the command through the shell).

        # while True:
        for i in range(100):  # Read up to 100KBytes for testing
            data = pipe.stdout.read(self.chunk_size)  # Read data from pip in chunks of self.chunk_size bytes
            self.q.put(data)

            # Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
            if len(data) < self.chunk_size:
                break

        try:
            pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
        except sp.TimeoutExpired:
            pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).



#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",

#Use public RTSP Streaming for testing
in_stream1 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"

in_stream2 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"


q1 = queue.Queue()
q2 = queue.Queue()

cctv_reader1 = CCTVReader(q1, in_stream1, 1024)  # First stream 
cctv_reader2 = CCTVReader(q2, in_stream2, 2048)  # Second stream

cctv_reader1.start()
time.sleep(5) # Wait 5 seconds (for testing).
cctv_reader2.start()

cctv_reader1.join()
cctv_reader2.join()

if q1.empty():
    print("There is a problem (q1 is empty)!!!")
else:
    # Write data from queue to file vid_from_queue1.264 (for testing)
    with open("vid_from_q1.264", "wb") as queue_save_file:
        while not q1.empty():
            queue_save_file.write(q1.get())

if q2.empty():
    print("There is a problem (q2 is empty)!!!")
else:
    # Write data from queue to file vid_from_queue2.264 (for testing)
    with open("vid_from_q2.264", "wb") as queue_save_file:
        while not q2.empty():
            queue_save_file.write(q2.get())

Обновление:

Не думаю, что вы можете используйте синтаксис, такой как ffmpeg -i URL_1 -i URL_2 - ...

Код, который вы опубликовали, имеет несколько проблем:

  1. cctv_reader.join() должен быть в секунде l oop, потому что он ожидает завершения потока и блокирует выполнение.
  2. Сохранять данные в файлы следует после завершения всех потоков (это только для тестирования).
    Если вы хотите записать данные, попробуйте сохранить каждый блок сразу после его захвата.
  3. Уменьшение размера bufsize=1024**3, try bufsize=1024**2*100.
    В случае, если ОС фактически выделяет размер буфера 1 ГБ на процесс, возможно, вам не хватит памяти.

Примечание: Python производительность многопоточности не так хороша, проверьте загрузку процессора.

Вот пример кода (чтение из файлов):

import numpy as np
import subprocess as sp
import threading
import queue

class CCTVReader(threading.Thread):
    def __init__(self, q, in_stream, chunk_size):
        super().__init__()
        self.q = q
        self.chunk_size = chunk_size
        self.command = ["ffmpeg",
                        "-c:v", "h264",     # Tell FFmpeg that input stream codec is h264
                        "-i", in_stream,    # Read stream from file vid.264
                        "-c:v", "copy",     # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
                        "-an", "-sn",       # No audio an no subtitles
                        "-f", "h264",       # Define pipe format to be h264
                        "-"]                # Output is a pipe

    def run(self):
        pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**2*100)

        # while True:
        for i in range(100):  # Read up to 100KBytes for testing
            data = pipe.stdout.read(self.chunk_size)  # Read data from pip in chunks of self.chunk_size bytes
            self.q.put(data)

            # Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
            if len(data) < self.chunk_size:
                break

        try:
            pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
        except sp.TimeoutExpired:
            pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).


    def save_q_to_file(self, vid_file_name):
        # Write data from queue to file (for testing)
        if self.q.empty():
            print("There is a problem (q is empty)!!!")
        else:            
            with open(vid_file_name, "wb") as queue_save_file:
                while not self.q.empty():
                    queue_save_file.write(self.q.get())

#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",

#Use public RTSP Streaming for testing
#in_stream = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"

#Use public RTSP Streaming for testing
readers = {}
queues = {}

# Read from file (for tesing)
dict = {
        "name1":{ "ip":"vid1.264",  "fname":"vid_from_q1.264"},
        "name2":{ "ip":"vid2.264",  "fname":"vid_from_q2.264"},
        "name3":{ "ip":"vid3.264",  "fname":"vid_from_q3.264"},
        "name4":{ "ip":"vid4.264",  "fname":"vid_from_q4.264"},
        "name5":{ "ip":"vid5.264",  "fname":"vid_from_q5.264"},
        "name6":{ "ip":"vid6.264",  "fname":"vid_from_q6.264"},
        "name7":{ "ip":"vid7.264",  "fname":"vid_from_q7.264"},
        "name8":{ "ip":"vid8.264",  "fname":"vid_from_q8.264"},
        "name9":{ "ip":"vid9.264",  "fname":"vid_from_q9.264"},
        "name10":{"ip":"vid10.264", "fname":"vid_from_q10.264"},
        "name11":{"ip":"vid11.264", "fname":"vid_from_q11.264"},
        "name12":{"ip":"vid12.264", "fname":"vid_from_q12.264"},
        "name13":{"ip":"vid13.264", "fname":"vid_from_q13.264"},
        "name14":{"ip":"vid14.264", "fname":"vid_from_q14.264"},
        "name15":{"ip":"vid15.264", "fname":"vid_from_q15.264"}
        }

for key in dict:
    ip = dict[key]["ip"]
    name = key
    q = queue.Queue()
    queues[name] = q
    cctv_reader = CCTVReader(q, ip, 8192)
    readers[name] = cctv_reader
    cctv_reader.start()

# Wait for all threads to end
for key in readers:
    readers[key].join()

# Save data for testing
for key in readers:
    fine_name = dict[key]["fname"]
    readers[key].save_q_to_file(fine_name)
...