gstreamer - Issues with Switching from VOD to Live RTMP Stream Using input-selector - Stack Overflow

admin2025-04-15  0

I am new to GStreamer. I have a pipeline that streams a VOD file to an RTMP output. I added a live RTMP stream as a second input and used input-selector to switch between them. However, after switching inputs, I encounter the following issues:

  1. Backward DTS warnings for both video and audio in flvmux.
  2. Prolonged buffering in the player before switching to the live stream.
  3. Audio-video synchronization issues after the switch.

Script:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import time

Gst.init(None)

class StreamSwitcher:
    def __init__(self):
        self.pipeline = Gst.Pipeline.new("dynamic-stream-pipeline")
        self.video_selector = None
        self.audio_selector = None
        self.live_bin = None
        self.start_time = None  # To track pipeline clock time

    def create_vod_pipeline(self):
        # ------------------------------
        # 1. VOD Source (Initial Setup)
        # ------------------------------
        # VOD Source
        vod_src = Gst.ElementFactory.make("filesrc", "vod_src")
        vod_src.set_property("location", "Atom.mp4")
        vod_decoder = Gst.ElementFactory.make("decodebin", "vod_decoder")

        # Input Selectors (for video/audio streams)
        self.video_selector = Gst.ElementFactory.make("input-selector", "video_selector")
        self.audio_selector = Gst.ElementFactory.make("input-selector", "audio_selector")
        self.video_selector.set_property("sync-streams", True)
        self.audio_selector.set_property("sync-streams", True)
        self.video_selector.set_property("sync-mode", 0)
        self.audio_selector.set_property("sync-mode", 0)

        # Encoder and Output
        video_encoder = Gst.ElementFactory.make("x264enc", "video_encoder")
        video_encoder.set_property("tune", "zerolatency")
        video_encoder.set_property("min-force-key-unit-interval", 2000000000)
        audio_encoder = Gst.ElementFactory.make("voaacenc", "audio_encoder")  # Using voaacenc
        flv_mux = Gst.ElementFactory.make("flvmux", "flv_mux")
        flv_mux.set_property("streamable", True)  # Ensures real-time timestamp handling
        rtmp_sink = Gst.ElementFactory.make("rtmpsink", "rtmp_sink")
        rtmp_sink.set_property("location", "rtmp://a.rtmp.youtube/live2/<stream-key>")


        video_queue_mux = Gst.ElementFactory.make("queue", "video_queue_mux")
        audio_queue_mux = Gst.ElementFactory.make("queue", "audio_queue_mux")

        video_queue_mux.set_property("max-size-buffers", 0)
        video_queue_mux.set_property("max-size-bytes", 0)
        video_queue_mux.set_property("max-size-time", 2000000000)  # 5 seconds buffering
        audio_queue_mux.set_property("max-size-buffers", 0)
        audio_queue_mux.set_property("max-size-bytes", 0)
        audio_queue_mux.set_property("max-size-time", 2000000000)  # 5 seconds buffering

        # Add elements to pipeline
        elements = [
            vod_src, vod_decoder,
            self.video_selector, self.audio_selector,
            video_encoder, audio_encoder, video_queue_mux, 
            audio_queue_mux, flv_mux, rtmp_sink
        ]

        for element in elements:
            if not element:
                raise RuntimeError(f"Failed to create element: {element}")
            self.pipeline.add(element)

        # Link VOD source to selectors
        vod_src.link(vod_decoder)
        vod_decoder.connect("pad-added", self._on_vod_pad_added)

        # Link selectors to encoders and output
        self.video_selector.link(video_encoder)
        self.audio_selector.link(audio_encoder)

        video_encoder.link(video_queue_mux)
        audio_encoder.link(audio_queue_mux)

        video_queue_mux.link(flv_mux)
        audio_queue_mux.link(flv_mux)

        flv_mux.link(rtmp_sink)

    def _on_vod_pad_added(self, element, pad):
        caps = pad.get_current_caps()
        if not caps:
            return

        structure = caps.get_structure(0)
        name = structure.get_name()

        if name.startswith("video/"):
            # Link to video_selector sink pad
            video_sink_pad = self.video_selector.get_static_pad("sink0")
            if not video_sink_pad:
                video_sink_pad = self.video_selector.get_request_pad("sink_%u")
            pad.link(video_sink_pad)
        elif name.startswith("audio/"):
            # Link to audio_selector sink pad
            audio_sink_pad = self.audio_selector.get_static_pad("sink0")
            if not audio_sink_pad:
                audio_sink_pad = self.audio_selector.get_request_pad("sink_%u")
            pad.link(audio_sink_pad)

    def _create_live_bin(self):
        # ------------------------------
        # 2. Live Stream Bin (Added Dynamically)
        # ------------------------------
        self.live_bin = Gst.Bin.new("live_bin")

        # print(self.pipeline.is_live())

        # Live Source
        live_src = Gst.ElementFactory.make("rtmpsrc", "live_src")
        live_src.set_property("location", "rtmp://localhost:1936/live/test")
        live_decoder = Gst.ElementFactory.make("decodebin", "live_decoder")

        # Identity Elements for Timestamp Fix
        live_video_identity = Gst.ElementFactory.make("identity", "live_video_identity")
        live_audio_identity = Gst.ElementFactory.make("identity", "live_audio_identity")

        # Get pipeline running clock time
        pipeline_clock = self.pipeline.get_clock().get_time()
        offset_ns = pipeline_clock  - self.start_time # Calculate offset

        print(f"Setting ts-offset for live stream: {offset_ns / 1e9} seconds")
        live_video_identity.set_property("ts-offset", offset_ns)
        live_audio_identity.set_property("ts-offset", offset_ns)

        live_video_identity.set_property("sync", True)
        live_audio_identity.set_property("sync", True)
        live_video_identity.set_property("drop-buffer-flags", 0x00000010)
        live_audio_identity.set_property("drop-buffer-flags", 0x00000010)
        live_video_identity.set_property("single-segment",True)
        live_audio_identity.set_property("single-segment",True)

        # Add elements to bin
        self.live_bin.add(live_src)
        self.live_bin.add(live_decoder)
        self.live_bin.add(live_video_identity)
        self.live_bin.add(live_audio_identity)

        # Link source to decoder
        live_src.link(live_decoder)

        # Dynamic pad linking
        live_decoder.connect("pad-added", self._on_live_pad_added, live_video_identity, live_audio_identity)

        # Ghost pads for selector connection
        video_sink_pad = live_video_identity.get_static_pad("src")
        video_ghost_pad = Gst.GhostPad.new("video_sink", video_sink_pad)
        self.live_bin.add_pad(video_ghost_pad)

        audio_sink_pad = live_audio_identity.get_static_pad("src")
        audio_ghost_pad = Gst.GhostPad.new("audio_sink", audio_sink_pad)
        self.live_bin.add_pad(audio_ghost_pad)

        return self.live_bin

    def _on_live_pad_added(self, element, pad, video_identity, audio_identity):
        caps = pad.get_current_caps()
        if not caps:
            print("No caps on pad, ignoring.")
            return

        structure = caps.get_structure(0)
        name = structure.get_name()

        if name.startswith("video/"):
            print("Live Video Pad Added. Linking to identity...")
            sink_pad = video_identity.get_static_pad("sink")
            if not pad.is_linked():
                pad.link(sink_pad)
            else:
                print("Warning: Video pad already linked!")

        elif name.startswith("audio/"):
            print("Live Audio Pad Added. Linking to identity...")
            sink_pad = audio_identity.get_static_pad("sink")
            if not pad.is_linked():
                pad.link(sink_pad)
            else:
                print("Warning: Audio pad already linked!")

        else:
            print(f"Ignoring pad with caps: {name}")

    def add_live_stream(self):
        print("Adding live stream dynamically...")
        # Create and add live_bin to pipeline
        live_bin = self._create_live_bin()
        self.pipeline.add(live_bin)

        # Link live_bin to selectors
        video_sink1 = self.video_selector.get_request_pad("sink_%u")
        audio_sink1 = self.audio_selector.get_request_pad("sink_%u")
        live_bin.get_static_pad("video_sink").link(video_sink1)
        live_bin.get_static_pad("audio_sink").link(audio_sink1)

        # Set live_bin to PLAYING state
        live_bin.sync_state_with_parent()

        # time.sleep(60)

        # Switch to live stream
        self.video_selector.set_property("active-pad", video_sink1)
        self.audio_selector.set_property("active-pad", audio_sink1)

    def on_state_changed(self, bus, msg):
        """ Handler for STATE_CHANGED messages on the bus """
        old_state, new_state, pending_state = msg.parse_state_changed()

        if new_state == Gst.State.PLAYING:
            # Pipeline has started playing, safe to get the clock time
            if self.start_time is None:
                self.start_time = self.pipeline.get_clock().get_time()
                print(f"Pipeline start time: {self.start_time / 1e9} seconds")
                return True  # Stop watching for state change after capturing start time

        return False

    def run(self):
        self.create_vod_pipeline()
        self.pipeline.use_clock(Gst.SystemClock.obtain())  # Force system clock
        self.pipeline.set_start_time(0)
        self.pipeline.set_base_time(0)
        self.pipeline.set_state(Gst.State.PLAYING)
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message::state-changed", self.on_state_changed)
        GLib.timeout_add_seconds(60, self.add_live_stream)  # Switch after 2 minutes
        GLib.MainLoop().run()

# ------------------------------
# Error Handling
# ------------------------------
def on_error(bus, msg):
    err, debug = msg.parse_error()
    print(f"Error: {err.message}")
    exit(1)

if __name__ == "__main__":
    switcher = StreamSwitcher()
    bus = switcher.pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message::error", on_error)
    switcher.run()

FLVMUX logs:

0:01:00.857546474 4002505 0x561487ffd9e0 WARN                  flvmux gstflvmux.c:1247:gst_flv_mux_buffer_to_tag_internal:<flv_mux:sink_1> Got backwards dts! (0:00:00.883000000 < 0:00:49.203000000)

0:01:00.861864099 4002505 0x561487ffd9e0 WARN                  flvmux gstflvmux.c:1247:gst_flv_mux_buffer_to_tag_internal:<flv_mux:sink_0> Got backwards dts! (0:00:00.667000000 < 0:00:49.203000000)
0:01:00.902488229 4002505 0x561487ffd9e0 WARN                  flvmux gstflvmux.c:1247:gst_flv_mux_buffer_to_tag_internal:<flv_mux:sink_1> Got backwards dts! (0:00:00.906000000 < 0:00:49.203000000)

0:01:00.921592496 4002505 0x561487ffd9e0 WARN                  flvmux gstflvmux.c:1247:gst_flv_mux_buffer_to_tag_internal:<flv_mux:sink_0> Got backwards dts! (0:00:00.707000000 < 0:00:49.203000000)

I start the pipeline with a VOD input and add a live input after 60 seconds before switching to it. After the switch, flvmux gives backward DTS warnings, and the player buffers until the flvmux logs stop.

I have already tried using ts-offset, but it didn’t resolve the issue. Any insights on how to fix these problems would be greatly appreciated.

Thank you!

转载请注明原文地址:http://www.anycun.com/QandA/1744714044a86598.html