Бесшовная петля видео в gstreamer - PullRequest
0 голосов
/ 12 декабря 2018

Я пытаюсь зациклить воспроизведение видео, используя gstreamer и его привязки к python.Первой попыткой было перехватить EOS message и сгенерировать сообщение о поиске для конвейера:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.EOS:  # End-Of-Stream: loop the video, seek to beginning
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.FLUSH,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("ERROR", message)
                break
        time.sleep(0.01) # Tried 0.001 - same result

if __name__ == "__main__":
    main()

И это на самом деле работает довольно хорошо, за исключением одной вещи - поиск в начале не совсем гладкий.Я вижу крошечный сбой.Поскольку видео представляет собой бесконечную анимацию, этот крошечный глюк действительно становится заметным.Моя вторая попытка состояла в том, чтобы использовать очередь для декодированных кадров и перехватить событие EOS :

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def cb_event(pad, info, *user_data):
    event = info.get_event()
    if event is not None and event.type == Gst.EventType.EOS:
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.PASS

def main():
    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, cb_event)

    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()

После первого события EOS воспроизведение просто остановилось.Я пробовал несколько разных вещей, таких как: pass событие EOS, drop EOS и добавление смещения на панель исходного кода декодера, отправка события поиска в сам конвейер и другие.Но я не могу заставить это работать.

В попытке понять, я также попытался включить режим отладки и написать свой собственный журнал активности конвейера, используя пробники.Режим отладки был не очень полезен, журнал очень громоздкий и пропускает некоторые детали.Мой собственный журнал включает в себя восходящие / нисходящие события и информацию о синхронизации буферов.Тем не менее, я до сих пор не могу понять, что не так и как заставить его работать.

Очевидно, я не просто что-то упустил, но не понимаю какой-то фундаментальной вещи о том, как работает конвейер gstreamer.

Итак, вопрос: Что мне делать со второй версией кода, чтобы она заработала?
Дополнительный вопрос: Существуют ли какие-либо инструменты или методы, чтобы получить четкое представление о том, что происходит внутри конвейера и его элементов?

Я буду очень признателен за подробные ответы.Для меня важнее понять , что я делаю неправильно, чем просто заставить программу работать.

ps Программа работает под GNU / Linux на плате NanoPi S2.Видео хранится в контейнере MP4 (без звука) и сжимается с помощью h264.Пожалуйста, не стесняйтесь размещать примеры кода на любом языке, не обязательно Python.

Ответы [ 3 ]

0 голосов
/ 06 января 2019

Ну, ладно.Я не получил ответ , поэтому я продолжил исследование и наконец нашел решение.Ниже я покажу два разных подхода.Первый - прямой ответ на вопрос с примером рабочего кода.Второй - другой подход, который кажется более естественным для gstreamer и определенно более простым.Оба дают желаемый результат - непрерывный цикл видео.

Исправленный код (ответ, но не лучший подход)

Изменения:

  1. Добавлен запрос продолжительности видео.В каждом цикле мы должны увеличивать временное смещение для значения длительность видео .Это позволяет эмулировать бесконечный непрерывный поток.
  2. Отправка события-поиска перенесена в отдельный поток.Согласно этой записи , мы не можем генерировать событие поиска из потоковой нити.Также обратите внимание на этот файл (ссылка из упомянутого поста).
  3. Функция обратного вызова теперь отбрасывает FLUSH событий (непрерывный поток не должен иметь FLUSH событий).
  4. Видеодекодер изменен с nxvideodec на avdec_h264.Это не относится к первоначальному вопросу и выполняется по особой причине .

Код:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time
import threading

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("avdec_h264", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

# UPD: Get video duration
pipeline0.set_state(Gst.State.PAUSED)
assert pipeline0.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.PAUSED
duration_ok, duration = pipeline0.query_duration(Gst.Format.TIME)
assert duration_ok

######################################################

seek_requested = threading.Event()
# UPD: Seek thread. Wait for seek request from callback and generate seek event
def seek_thread_func(queue_sink_pad):
    cumulative_offset = 0
    while True:
        seek_requested.wait()
        seek_requested.clear()
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        # Add offset. It is important step to ensure that downstream elements will 'see' infinite contiguous stream
        cumulative_offset += duration
        queue_sink_pad.set_offset(cumulative_offset)

def cb_event(pad, info):
    event = info.get_event()
    if event is not None:
        if event.type == Gst.EventType.EOS:  # UPD: Set 'seek_requested' flag
            seek_requested.set()
            return Gst.PadProbeReturn.DROP
        elif event.type == Gst.EventType.FLUSH_START or event.type == Gst.EventType.FLUSH_STOP:  # UPD: Drop FLUSH
            return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.OK

def main():
    queue_sink_pad = queue.get_static_pad("sink")

    # UPD: Create separate 'seek thread'
    threading.Thread(target=seek_thread_func, daemon=True, args=(queue_sink_pad,)).start()

    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM | Gst.PadProbeType.EVENT_FLUSH,
                           cb_event)

    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()

Этот код работает.Поиск эффективно выполняется, когда буферы из очереди все еще воспроизводятся.Тем не менее, я считаю, что он может содержать некоторые недостатки или даже ошибки.Например, SEGMENT события передаются вниз по потоку с флагом RESET;это не кажется правильным.Гораздо более понятным (и, возможно, более правильным / надежным) способом реализации этого подхода является создание плагина gstreamer.Плагин будет управлять событиями и настраивать метки времени событий и буфера.

Но есть более простое и нативное решение:

Использование поиска по сегментам и SEGMENT_DONE сообщение

Согласно документации :

Поиск сегмента (с использованием GST_SEEK_FLAG_SEGMENT) не выдаст EOS в конце сегмента воспроизведения, но отправит сообщение SEGMENT_DONE на шину.Это сообщение публикуется элементом, управляющим воспроизведением в конвейере, обычно демультиплексором.После получения сообщения приложение может повторно подключить конвейер или выполнить другие события поиска в конвейере. Поскольку сообщение отправляется в конвейер как можно раньше, у приложения есть время для нового запроса, чтобы сделать переход плавным. Обычно допустимая задержка определяется также размерами буфера приемников.как размер любых очередей в конвейере.

Сообщение SEGMENT_DONE действительно отправляется раньше, чем очередь становится пустой.Это дает более чем достаточно времени для следующего поиска.Так что все, что нам нужно сделать, это запустить поиск сегментов в самом начале воспроизведения.Затем дождитесь сообщения SEGMENT_DONE и отправьте следующее незаполненное событие поиска.Вот рабочий пример:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)
    pipeline0.get_state(Gst.CLOCK_TIME_NONE)
    pipeline0.seek(1.0,
                   Gst.Format.TIME,
                   Gst.SeekFlags.SEGMENT,
                   Gst.SeekType.SET, 0,
                   Gst.SeekType.NONE, 0)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.SEGMENT_DONE:
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.SEGMENT,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("bus ERROR", message)
                break
        time.sleep(0.01)

if __name__ == "__main__":
    main()

При конфигурации очереди по умолчанию сообщение SEGMENT_DONE отправляется примерно на 1 секунду раньше, чем воспроизводится последний видеокадр.Поиск без очистки гарантирует, что ни один из кадров не будет потерян.Вместе это дает отличный результат - по-настоящему плавный цикл видео.

Примечание: я переключаю конвейер в состояние PLAYING, а затем выполняю начальный поиск без сброса.В качестве альтернативы мы можем переключить конвейер в состояние PAUSED, выполнить сбрасывание поиск сегмента и затем переключить конвейер в состояние PLAYING.

Примечание 2: Различные источники предлагают немного другое решение.См. Ссылку ниже.


Связанные темы и источники:

  1. http://gstreamer-devel.966125.n4.nabble.com/Flushing-the-data-in-partial-pipeline-tp4681893p4681899.html
  2. http://gstreamer -devel.966125.n4.nabble.com / Loop-a-file-using-playbin-без-artefacts-td4671952.html
0 голосов
/ 21 марта 2019

Я использую метод SEGMENT_DONE:

import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
Gst.init(None)

pipeline = Gst.parse_launch('uridecodebin uri=file://%s name=d d. ! autovideosink d. ! autoaudiosink' %sys.argv[1])
bus = pipeline.get_bus()
bus.add_signal_watch()

def on_segment_done(bus, msg):
    pipeline.seek(1.0,
        Gst.Format.TIME,
        Gst.SeekFlags.SEGMENT,
        Gst.SeekType.SET, 0,
        Gst.SeekType.NONE, 0)
    return True
bus.connect('message::segment-done', on_segment_done)

pipeline.set_state(Gst.State.PLAYING)
pipeline.get_state(Gst.CLOCK_TIME_NONE)
pipeline.seek(1.0,
    Gst.Format.TIME,
    Gst.SeekFlags.SEGMENT,
    Gst.SeekType.SET, 0,
    Gst.SeekType.NONE, 0)

GLib.MainLoop().run()
0 голосов
/ 12 декабря 2018

Я рекомендую взглянуть на приложение gst-play-1.0, а также на элемент playbin в GStreamer.

См. Здесь: https://github.com/GStreamer/gst-plugins-base/blob/master/tools/gst-play.c

Это поддерживает параметр --gaplessиграть много файлов без пробелов.Он использует сигнал about-to-finish элемента playbin.

Это конкретное приложение делает это с несколькими файлами вместо одного, но я думаю, вы можете попробовать дать один и тот же файл несколько раз для тестаесли он действительно беспроблемный или имеет ту же проблему с вашим подходом 1).

По сути, я думаю, что EOS слишком поздно, чтобы снова подготовить первый кадр вовремя из-за deinit / init / processingдекодер и промывка трубопровода.Кроме того, сброс будет как бы сбрасывать ваш поток, конвейер снова переходит в режим предварительной синхронизации и синхронизируется с новыми часами.Это действительно не непрерывный поток изнутри.

В качестве альтернативы, возможно, GStreamer Editing Services может сделать это тоже.Но это, вероятно, работает с несколькими дорожками, что означает, что он может попытаться создать несколько экземпляров декодера одновременно, чтобы выполнить параллельную обработку - что может быть проблемой на вашей плате.

В крайнем случае может быть демультиплексировать MP4 внеобработанный bistream, поместите этот непрерывный поток битов в сокет и декодируйте его.Тогда он будет отображаться как бесконечный поток битов, который воспроизводится.

Редактировать: Возможно, также стоит попробовать multifilesrc со своим свойством loop, чтобы посмотреть, работает ли он без зазоров или имееттакже выполнить сброс между файлами.

...