Вызов g_signal_connect в обратном вызове в потоке Qt - PullRequest
1 голос
/ 02 апреля 2020

Кажется, что если я вызываю g_signal_connect в функции обратного вызова, которая зарегистрирована в потоке Qt, то функция обратного вызова, зарегистрированная этим g_signal_connect, не вызывается. Например, используя первый незакомментированный main в этом коде, udp_source_timeout_callback не вызывается, но вызывается, если я вместо этого использую закомментированный main. Обратные вызовы, зарегистрированные в методе run, вызываются в обоих случаях. Почему это?

РЕДАКТИРОВАТЬ: Это плохой подход? Не следует ли регистрировать функции обратного вызова внутри функций обратного вызова? Я не уверен в другом способе сделать это.

acquisitiontype.h

#ifndef ACQUISITIONTYPE_H
#define ACQUISITIONTYPE_H

#include <QObject>


class AcquisitionType : public QObject
{
    Q_OBJECT

public:
    explicit AcquisitionType(QObject *parent = nullptr);


public slots:
    void run();
};

#endif // ACQUISITIONTYPE_H

main. cpp

#include <gst/gst.h>
#include <gst/app/gstappsink.h>

#include <cstdlib>
#include <cstdio>

#include <QApplication>
#include <QThread>

#include "acquisitiontype.h"


struct gstreamer_data {
    GstElement* pipeline;
    GstElement* udp_source;
    GstElement* rtp_decoder;
    GstElement* video_decoder;
    GstElement* video_converter;
    GstElement* app_sink;
    gulong signal_handler_id;
    GMainLoop* main_loop;
};


AcquisitionType::AcquisitionType(QObject *parent) : QObject(parent) {
}


GstPadProbeReturn udp_source_buffer_pad_probe_callback(GstPad* pad, GstPadProbeInfo *info, gstreamer_data* user_data);
void udp_source_timeout_callback(GstBus* bus, GstMessage* message, gstreamer_data* data);

GstFlowReturn new_sample_callback(GstAppSink* appsink, gstreamer_data* user_data) {
    (void) user_data;
    GstSample* sample;
    GstCaps* caps;
    GstBuffer* buffer;
    GstMapInfo map;


    sample = gst_app_sink_pull_sample(appsink);
    if (sample) {
        caps = gst_sample_get_caps(sample);

        buffer = gst_sample_get_buffer(sample);
        if (gst_buffer_map(buffer, &map, GST_MAP_READ)) {
            printf("have sample\n");

            gst_buffer_unmap(buffer, &map);
        }

        gst_sample_unref(sample);
    }

    return GST_FLOW_OK;
}


GstPadProbeReturn udp_source_buffer_pad_probe_callback(GstPad* pad, GstPadProbeInfo *info, gstreamer_data* user_data) {
    (void) pad;
    (void) info;
    GstBus* bus;


    printf("have data\n");

    bus = gst_element_get_bus(user_data->pipeline);
    user_data->signal_handler_id = g_signal_connect(G_OBJECT(bus), "message::element", (GCallback) udp_source_timeout_callback, user_data);
    gst_object_unref(bus);

    return GST_PAD_PROBE_REMOVE;
}


void udp_source_timeout_callback(GstBus* bus, GstMessage* message, gstreamer_data* data) {
    const GstStructure* st = gst_message_get_structure(message);
    GstPad* pad;


    if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ELEMENT) {
        if (gst_structure_has_name(st, "GstUDPSrcTimeout")) {

            printf("no data\n");

            g_signal_handler_disconnect(G_OBJECT(bus), data->signal_handler_id);

            pad = gst_element_get_static_pad(data->udp_source, "src");
            gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, (GstPadProbeCallback) udp_source_buffer_pad_probe_callback, data, NULL);
            gst_object_unref(pad);
        }
    }
}


void bus_error_callback(GstBus* bus, GstMessage* message, gstreamer_data* data) {
    (void) bus;
    GError* err;
    gchar* debug_info;


    gst_message_parse_error(message, &err, &debug_info);
    g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(message->src), err->message);
    g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
    g_clear_error(&err);
    g_free(debug_info);

    g_main_loop_quit(data->main_loop);
}


void AcquisitionType::run() {
    gstreamer_data data;
    GstStateChangeReturn ret;
    GstBus* bus;
    GstPad* pad;


    gst_init(NULL, NULL);


    data.udp_source = gst_element_factory_make("udpsrc", "udp_source");
    g_object_set(G_OBJECT(data.udp_source),
        "port", 5000,
        "caps", gst_caps_new_empty_simple("application/x-rtp"),
        "timeout", 1000000000,
        NULL);

    data.rtp_decoder = gst_element_factory_make("rtph264depay", "rtp_decoder");

    data.video_decoder = gst_element_factory_make("avdec_h264", "video_decoder");

    data.video_converter = gst_element_factory_make("videoconvert", "video_converter");

    data.app_sink = gst_element_factory_make("appsink", "app_sink");
    g_object_set(G_OBJECT(data.app_sink),
        "emit-signals", true,
        NULL);

    g_signal_connect(data.app_sink, "new-sample", (GCallback) new_sample_callback, &data);


    data.pipeline = gst_pipeline_new("pipeline");


    if (
        !data.pipeline ||
        !data.udp_source ||
        !data.rtp_decoder ||
        !data.video_decoder ||
        !data.video_converter ||
        !data.app_sink
        )
        {
            g_printerr("Not all elements could be created.\n");
            exit(-1);
        }


    gst_bin_add_many(
        GST_BIN(data.pipeline),
        data.udp_source,
        data.rtp_decoder,
        data.video_decoder,
        data.video_converter,
        data.app_sink,
        NULL);


    if (gst_element_link_many(
        data.udp_source,
        data.rtp_decoder,
        data.video_decoder,
        data.video_converter,
        data.app_sink,
        NULL) != TRUE)
        {
            g_printerr("Elements could not be linked.\n");
            gst_object_unref(data.pipeline);
            exit(-1);
        }


    pad = gst_element_get_static_pad(data.udp_source, "src");
    gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, (GstPadProbeCallback) udp_source_buffer_pad_probe_callback, &data, NULL);
    gst_object_unref(pad);

    bus = gst_element_get_bus(data.pipeline);
    gst_bus_add_signal_watch(bus);
    g_signal_connect(G_OBJECT(bus), "message::error", (GCallback) bus_error_callback, &data);
    gst_object_unref(bus);


    ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
    if (ret == GST_STATE_CHANGE_FAILURE) {
        g_printerr("Unable to set the pipeline to the playing state.\n");
        gst_object_unref(data.pipeline);
        exit(-1);
    }

    data.main_loop = g_main_loop_new(NULL, FALSE);
    g_main_loop_run(data.main_loop);
}

int main(int argc, char *argv[]) {
    QApplication app(argc, argv);

    QThread* thread;
    AcquisitionType* worker;


    thread = new QThread;
    worker = new AcquisitionType(nullptr);

    worker->moveToThread(thread);

    QObject::connect(thread, SIGNAL(started()), worker, SLOT(run()));

    thread->start();

    return app.exec();
}

/*
int main() {
    AcquisitionType t;


    t.run();

    return 0;
}
*/
...