Кажется, что если я вызываю g_signal_connect
в функции обратного вызова, которая зарегистрирована в потоке Qt, то функция обратного вызова, зарегистрированная этим g_signal_connect
, не вызывается. Например, используя первый незакомментированный main
в этом коде, udp_source_timeout_callback
не вызывается, но вызывается, если я вместо этого использую закомментированный main
. Обратные вызовы, зарегистрированные в методе run
, вызываются в обоих случаях. Почему это?
РЕДАКТИРОВАТЬ: Это плохой подход? Не следует ли регистрировать функции обратного вызова внутри функций обратного вызова? Я не уверен в другом способе сделать это.
acquisitiontype.h
#ifndef ACQUISITIONTYPE_H
#define ACQUISITIONTYPE_H
#include <QObject>
class AcquisitionType : public QObject
{
Q_OBJECT
public:
explicit AcquisitionType(QObject *parent = nullptr);
public slots:
void run();
};
#endif // ACQUISITIONTYPE_H
main. cpp
#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <cstdlib>
#include <cstdio>
#include <QApplication>
#include <QThread>
#include "acquisitiontype.h"
struct gstreamer_data {
GstElement* pipeline;
GstElement* udp_source;
GstElement* rtp_decoder;
GstElement* video_decoder;
GstElement* video_converter;
GstElement* app_sink;
gulong signal_handler_id;
GMainLoop* main_loop;
};
AcquisitionType::AcquisitionType(QObject *parent) : QObject(parent) {
}
GstPadProbeReturn udp_source_buffer_pad_probe_callback(GstPad* pad, GstPadProbeInfo *info, gstreamer_data* user_data);
void udp_source_timeout_callback(GstBus* bus, GstMessage* message, gstreamer_data* data);
GstFlowReturn new_sample_callback(GstAppSink* appsink, gstreamer_data* user_data) {
(void) user_data;
GstSample* sample;
GstCaps* caps;
GstBuffer* buffer;
GstMapInfo map;
sample = gst_app_sink_pull_sample(appsink);
if (sample) {
caps = gst_sample_get_caps(sample);
buffer = gst_sample_get_buffer(sample);
if (gst_buffer_map(buffer, &map, GST_MAP_READ)) {
printf("have sample\n");
gst_buffer_unmap(buffer, &map);
}
gst_sample_unref(sample);
}
return GST_FLOW_OK;
}
GstPadProbeReturn udp_source_buffer_pad_probe_callback(GstPad* pad, GstPadProbeInfo *info, gstreamer_data* user_data) {
(void) pad;
(void) info;
GstBus* bus;
printf("have data\n");
bus = gst_element_get_bus(user_data->pipeline);
user_data->signal_handler_id = g_signal_connect(G_OBJECT(bus), "message::element", (GCallback) udp_source_timeout_callback, user_data);
gst_object_unref(bus);
return GST_PAD_PROBE_REMOVE;
}
void udp_source_timeout_callback(GstBus* bus, GstMessage* message, gstreamer_data* data) {
const GstStructure* st = gst_message_get_structure(message);
GstPad* pad;
if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ELEMENT) {
if (gst_structure_has_name(st, "GstUDPSrcTimeout")) {
printf("no data\n");
g_signal_handler_disconnect(G_OBJECT(bus), data->signal_handler_id);
pad = gst_element_get_static_pad(data->udp_source, "src");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, (GstPadProbeCallback) udp_source_buffer_pad_probe_callback, data, NULL);
gst_object_unref(pad);
}
}
}
void bus_error_callback(GstBus* bus, GstMessage* message, gstreamer_data* data) {
(void) bus;
GError* err;
gchar* debug_info;
gst_message_parse_error(message, &err, &debug_info);
g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(message->src), err->message);
g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
g_clear_error(&err);
g_free(debug_info);
g_main_loop_quit(data->main_loop);
}
void AcquisitionType::run() {
gstreamer_data data;
GstStateChangeReturn ret;
GstBus* bus;
GstPad* pad;
gst_init(NULL, NULL);
data.udp_source = gst_element_factory_make("udpsrc", "udp_source");
g_object_set(G_OBJECT(data.udp_source),
"port", 5000,
"caps", gst_caps_new_empty_simple("application/x-rtp"),
"timeout", 1000000000,
NULL);
data.rtp_decoder = gst_element_factory_make("rtph264depay", "rtp_decoder");
data.video_decoder = gst_element_factory_make("avdec_h264", "video_decoder");
data.video_converter = gst_element_factory_make("videoconvert", "video_converter");
data.app_sink = gst_element_factory_make("appsink", "app_sink");
g_object_set(G_OBJECT(data.app_sink),
"emit-signals", true,
NULL);
g_signal_connect(data.app_sink, "new-sample", (GCallback) new_sample_callback, &data);
data.pipeline = gst_pipeline_new("pipeline");
if (
!data.pipeline ||
!data.udp_source ||
!data.rtp_decoder ||
!data.video_decoder ||
!data.video_converter ||
!data.app_sink
)
{
g_printerr("Not all elements could be created.\n");
exit(-1);
}
gst_bin_add_many(
GST_BIN(data.pipeline),
data.udp_source,
data.rtp_decoder,
data.video_decoder,
data.video_converter,
data.app_sink,
NULL);
if (gst_element_link_many(
data.udp_source,
data.rtp_decoder,
data.video_decoder,
data.video_converter,
data.app_sink,
NULL) != TRUE)
{
g_printerr("Elements could not be linked.\n");
gst_object_unref(data.pipeline);
exit(-1);
}
pad = gst_element_get_static_pad(data.udp_source, "src");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, (GstPadProbeCallback) udp_source_buffer_pad_probe_callback, &data, NULL);
gst_object_unref(pad);
bus = gst_element_get_bus(data.pipeline);
gst_bus_add_signal_watch(bus);
g_signal_connect(G_OBJECT(bus), "message::error", (GCallback) bus_error_callback, &data);
gst_object_unref(bus);
ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
g_printerr("Unable to set the pipeline to the playing state.\n");
gst_object_unref(data.pipeline);
exit(-1);
}
data.main_loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(data.main_loop);
}
int main(int argc, char *argv[]) {
QApplication app(argc, argv);
QThread* thread;
AcquisitionType* worker;
thread = new QThread;
worker = new AcquisitionType(nullptr);
worker->moveToThread(thread);
QObject::connect(thread, SIGNAL(started()), worker, SLOT(run()));
thread->start();
return app.exec();
}
/*
int main() {
AcquisitionType t;
t.run();
return 0;
}
*/