From 953db14b1fb97ac1865b95222a2688717a88867d Mon Sep 17 00:00:00 2001 From: Melissa Montero Date: Tue, 27 Oct 2020 12:05:52 -0600 Subject: [PATCH 1/2] Forward query allocation and aggregate the results This will forward the query allocation when received between the connected listeners and aggregates the results of each listeners. This helps to indicate the upstream elements which metas are supported. For example it avoids v4l2src buffer copy when the downstream elements doesn't report that video meta is supported and the pitch is different than width. --- gst/interpipe/gstinterpipeilistener.c | 15 ++ gst/interpipe/gstinterpipeilistener.h | 14 ++ gst/interpipe/gstinterpipesink.c | 227 +++++++++++++++++++++++++- gst/interpipe/gstinterpipesrc.c | 31 ++++ 4 files changed, 286 insertions(+), 1 deletion(-) diff --git a/gst/interpipe/gstinterpipeilistener.c b/gst/interpipe/gstinterpipeilistener.c index 6908c8b..6266cb9 100644 --- a/gst/interpipe/gstinterpipeilistener.c +++ b/gst/interpipe/gstinterpipeilistener.c @@ -142,6 +142,21 @@ gst_inter_pipe_ilistener_push_event (GstInterPipeIListener * self, return iface->push_event (self, event, basetime); } +gboolean +gst_inter_pipe_ilistener_query (GstInterPipeIListener * self, GstQuery * query) +{ + GstInterPipeIListenerInterface *iface; + + g_return_val_if_fail (GST_INTER_PIPE_IS_ILISTENER (self), FALSE); + g_return_val_if_fail (query, FALSE); + + iface = GST_INTER_PIPE_ILISTENER_GET_IFACE (self); + g_return_val_if_fail (iface->query != NULL, FALSE); + + return iface->query (self, query); +} + + gboolean gst_inter_pipe_ilistener_send_eos (GstInterPipeIListener * self) { diff --git a/gst/interpipe/gstinterpipeilistener.h b/gst/interpipe/gstinterpipeilistener.h index 577cfb8..4c210a3 100644 --- a/gst/interpipe/gstinterpipeilistener.h +++ b/gst/interpipe/gstinterpipeilistener.h @@ -94,6 +94,7 @@ struct _GstInterPipeIListenerInterface gboolean (* node_removed) (GstInterPipeIListener *iface, const gchar *node_removed); gboolean (* push_buffer) (GstInterPipeIListener *iface, GstBuffer *buffer, guint64 basetime); gboolean (* push_event) (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + gboolean (* query) (GstInterPipeIListener *iface, GstQuery *query); gboolean (* send_eos) (GstInterPipeIListener *iface); }; @@ -193,6 +194,19 @@ gboolean gst_inter_pipe_ilistener_push_buffer (GstInterPipeIListener *iface, gboolean gst_inter_pipe_ilistener_push_event (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + +/** + * gst_inter_pipe_ilistener_query: + * @iface: (transfer none)(not nullable): The object that should query the #GstQuery downstream. + * @event: (transfer full)(not nullable): The #GstQuery to be pushed downstream. + * + * Ask @query to the downstream element. + * + * Return: True if the query was successful, False otherwise. + */ +gboolean gst_inter_pipe_ilistener_query (GstInterPipeIListener *iface, + GstQuery *query); + /** * gst_inter_pipe_ilistener_send_eos: * @iface: (transfer none)(not nullable): The object that should push the #GstEvent downstream. diff --git a/gst/interpipe/gstinterpipesink.c b/gst/interpipe/gstinterpipesink.c index b757a4b..9524c6f 100644 --- a/gst/interpipe/gstinterpipesink.c +++ b/gst/interpipe/gstinterpipesink.c @@ -82,6 +82,8 @@ static gboolean gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * filter); static gboolean gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event); +static gboolean gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, + GstQuery * query); static gboolean gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, GstCaps * listener_caps, GstCaps * sinkcaps); static GstCaps *gst_inter_pipe_sink_caps_intersect (GstCaps * caps1, @@ -174,7 +176,8 @@ gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass) basesink_class->get_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_get_caps); basesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_set_caps); basesink_class->event = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_event); - + basesink_class->propose_allocation = + GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_propose_allocation); } static void @@ -598,6 +601,228 @@ gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event) event); } + +struct AllocQueryCtx +{ + GstInterPipeSink *sink; + GstQuery *query; + GstAllocationParams params; + guint size; + guint min_buffers; + gboolean first_query; + guint num_listeners; +}; + + +static gboolean +gst_inter_pipe_sink_forward_query_allocation (gpointer key, gpointer data, + gpointer user_data) +{ + struct AllocQueryCtx *ctx; + GstInterPipeIListener *listener; + gchar *listener_name; + GstInterPipeSink *sink; + GstQuery *query; + GstCaps *caps; + gboolean ret = TRUE; + guint count, i, size, min; + + listener = GST_INTER_PIPE_ILISTENER (data); + listener_name = (gchar *) key; + ctx = user_data; + sink = ctx->sink; + + GST_DEBUG_OBJECT (sink, "Aggregating allocation from listener %s", + listener_name); + + gst_query_parse_allocation (ctx->query, &caps, NULL); + + query = gst_query_new_allocation (caps, FALSE); + if (!gst_inter_pipe_ilistener_query (listener, query)) { + GST_DEBUG_OBJECT (sink, + "Allocation query failed on listener %s, ignoring allocation", + listener_name); + ret = FALSE; + goto out; + } + + /* Allocation Filter, extract of code from tee element */ + + /* Allocation Params: + * store the maximum alignment, prefix and padding, but ignore the + * allocators and the flags which are tied to downstream allocation*/ + count = gst_query_get_n_allocation_params (query); + for (i = 0; i < count; i++) { + GstAllocationParams params = { 0, }; + + gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms); + + GST_DEBUG_OBJECT (sink, "Aggregating AllocationParams align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, params.align, params.prefix, params.padding); + + if (ctx->params.align < params.align) + ctx->params.align = params.align; + + if (ctx->params.prefix < params.prefix) + ctx->params.prefix = params.prefix; + + if (ctx->params.padding < params.padding) + ctx->params.padding = params.padding; + } + + /* Allocation Pool: + * We want to keep the biggest size and biggest minimum number of buffers to + * make sure downstream requirement can be satisfied. We don't really care + * about the maximum, as this is a parameter of the downstream provided + * pool. We only read the first allocation pool as the minimum number of + * buffers is normally constant regardless of the pool being used. */ + if (gst_query_get_n_allocation_pools (query) > 0) { + gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL); + + GST_DEBUG_OBJECT (sink, + "Aggregating allocation pool size=%u min_buffers=%u", size, min); + + if (ctx->size < size) + ctx->size = size; + + if (ctx->min_buffers < min) + ctx->min_buffers = min; + } + + /* Allocation Meta: + * For allocation meta, we'll need to aggregate the argument using the new + * GstMetaInfo::agggregate_func */ + count = gst_query_get_n_allocation_metas (query); + for (i = 0; i < count; i++) { + guint ctx_index; + GType api; + const GstStructure *param; + + api = gst_query_parse_nth_allocation_meta (query, i, ¶m); + + /* For the first query, copy all metas */ + if (ctx->first_query) { + gst_query_add_allocation_meta (ctx->query, api, param); + continue; + } + + /* Afterward, aggregate the common params */ + if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) { + const GstStructure *ctx_param; + + gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param); + + /* Keep meta which has no params */ + if (ctx_param == NULL && param == NULL) + continue; + + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, ctx_index); + } + } + + /* Finally, cleanup metas from the stored query that aren't support on this + * listener. */ + count = gst_query_get_n_allocation_metas (ctx->query); + for (i = 0; i < count;) { + GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL); + + if (!gst_query_find_allocation_meta (query, api, NULL)) { + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, i); + count--; + continue; + } + + i++; + } + + ctx->first_query = FALSE; + ctx->num_listeners++; + +out: + gst_query_unref (query); + return ret; +} + +static gboolean +gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, GstQuery * query) +{ + struct AllocQueryCtx ctx = { 0 }; + GstInterPipeSink *sink; + GHashTable *listeners; + GHashTableIter iter; + gboolean ret = TRUE; + gpointer key, value; + + sink = GST_INTER_PIPE_SINK (base); + + g_mutex_lock (&sink->listeners_mutex); + listeners = GST_INTER_PIPE_SINK_LISTENERS (sink); + + ctx.sink = sink; + ctx.query = query; + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + + g_hash_table_iter_init (&iter, listeners); + + while (g_hash_table_iter_next (&iter, &key, &value)) { + ret |= gst_inter_pipe_sink_forward_query_allocation (key, value, &ctx); + } + + if (ret) { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + GST_DEBUG_OBJECT (sink, + "Final allocation parameters: align=%" G_GSIZE_FORMAT " prefix=%" + G_GSIZE_FORMAT " padding %" G_GSIZE_FORMAT, ctx.params.align, + ctx.params.prefix, ctx.params.padding); + + GST_DEBUG_OBJECT (sink, "Final allocation pools: size=%u min_buffers=%u", + ctx.size, ctx.min_buffers); + + GST_DEBUG_OBJECT (sink, "Final %u allocation meta:", count); + + for (i = 0; i < count; i++) { + GST_DEBUG_OBJECT (sink, " + aggregated allocation meta %s", + g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i, + NULL))); + } + + /* Allocate one more buffers when multiplexing so we don't starve the + * downstream threads. */ + if (ctx.num_listeners > 1) + ctx.min_buffers++; + + /* Check that we actually have parameters besides the defaults. */ + if (ctx.params.align || ctx.params.prefix || ctx.params.padding) { + gst_query_add_allocation_param (ctx.query, NULL, &ctx.params); + } + + /* When size == 0, buffers created from this pool would have no memory + * allocated. */ + if (ctx.size) { + gst_query_add_allocation_pool (ctx.query, NULL, ctx.size, + ctx.min_buffers, 0); + } + + } else { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + for (i = 1; i <= count; i++) { + gst_query_remove_nth_allocation_meta (query, count - i); + } + } + + g_mutex_unlock (&sink->listeners_mutex); + + return ret; +} + /* Appsink Callbacks */ static void gst_inter_pipe_sink_push_to_listener (gpointer key, gpointer data, diff --git a/gst/interpipe/gstinterpipesrc.c b/gst/interpipe/gstinterpipesrc.c index fd2538c..143977c 100644 --- a/gst/interpipe/gstinterpipesrc.c +++ b/gst/interpipe/gstinterpipesrc.c @@ -87,6 +87,8 @@ static gboolean gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, static gboolean gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, guint64 basetime); static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface); +static gboolean gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, + GstQuery * query); static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name); static gboolean gst_inter_pipe_src_start (GstBaseSrc * base); @@ -521,6 +523,7 @@ gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface) iface->set_caps = gst_inter_pipe_src_set_caps; iface->push_buffer = gst_inter_pipe_src_push_buffer; iface->push_event = gst_inter_pipe_src_push_event; + iface->query = gst_inter_pipe_src_push_query; iface->send_eos = gst_inter_pipe_src_send_eos; } @@ -756,6 +759,7 @@ gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, } } + static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) { @@ -775,6 +779,33 @@ gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) return TRUE; } + +static gboolean +gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, GstQuery * query) +{ + GstInterPipeSrc *src; + GstPad *srcpad; + GstPad *peerpad; + gboolean ret = TRUE; + + src = GST_INTER_PIPE_SRC (iface); + srcpad = GST_INTER_PIPE_SRC_PAD (GST_APP_SRC (src)); + + peerpad = gst_pad_get_peer (srcpad); + if (!peerpad) { + ret = FALSE; + goto out; + } + + ret = gst_pad_query (peerpad, query); + + gst_object_unref (peerpad); + +out: + return ret; +} + + static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name) { From 4f0330f64ef98e82d5560f823a02e21fa6243b46 Mon Sep 17 00:00:00 2001 From: ABeltramo Date: Fri, 17 Oct 2025 11:58:23 +0100 Subject: [PATCH 2/2] feat: forwarding queries from src->sink --- .gitignore | 2 + common | 2 +- gst/interpipe/gstinterpipeinode.c | 23 +++- gst/interpipe/gstinterpipeinode.h | 3 + gst/interpipe/gstinterpipesink.c | 75 +++++++------ gst/interpipe/gstinterpipesrc.c | 179 +++++++++++++++++++++++++----- meson_options.txt | 2 +- 7 files changed, 217 insertions(+), 69 deletions(-) diff --git a/.gitignore b/.gitignore index 82bf8b9..6bc0212 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,5 @@ docs/plugins/sgml.stamp docs/plugins/xml/ docs/version.entities docs/plugins/inspect-registry.xml + +.idea/ diff --git a/common b/common index dd9d403..52adcdb 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit dd9d4031075713cf37c656ce639b6d60d6f9dde3 +Subproject commit 52adcdb89a9eb527df38c569539d95c1c7aeda6e diff --git a/gst/interpipe/gstinterpipeinode.c b/gst/interpipe/gstinterpipeinode.c index 93eb4c1..92a7ec9 100644 --- a/gst/interpipe/gstinterpipeinode.c +++ b/gst/interpipe/gstinterpipeinode.c @@ -34,14 +34,14 @@ G_DEFINE_INTERFACE (GstInterPipeINode, gst_inter_pipe_inode, G_TYPE_OBJECT); static void -gst_inter_pipe_inode_default_init (GstInterPipeINodeInterface * iface) +gst_inter_pipe_inode_default_init (GstInterPipeINodeInterface *iface) { //NOP } gboolean -gst_inter_pipe_inode_add_listener (GstInterPipeINode * self, - GstInterPipeIListener * listener) +gst_inter_pipe_inode_add_listener (GstInterPipeINode *self, + GstInterPipeIListener *listener) { GstInterPipeINodeInterface *iface; @@ -54,8 +54,8 @@ gst_inter_pipe_inode_add_listener (GstInterPipeINode * self, } gboolean -gst_inter_pipe_inode_remove_listener (GstInterPipeINode * self, - GstInterPipeIListener * listener) +gst_inter_pipe_inode_remove_listener (GstInterPipeINode *self, + GstInterPipeIListener *listener) { GstInterPipeINodeInterface *iface; @@ -68,7 +68,7 @@ gst_inter_pipe_inode_remove_listener (GstInterPipeINode * self, } gboolean -gst_inter_pipe_inode_receive_event (GstInterPipeINode * self, GstEvent * event) +gst_inter_pipe_inode_receive_event (GstInterPipeINode *self, GstEvent *event) { GstInterPipeINodeInterface *iface; @@ -79,3 +79,14 @@ gst_inter_pipe_inode_receive_event (GstInterPipeINode * self, GstEvent * event) return iface->receive_event (self, event); } + +gboolean +gst_inter_pipe_inode_send_query (GstInterPipeINode *self, GstQuery *query) +{ + g_return_val_if_fail (GST_INTER_PIPE_IS_INODE (self), FALSE); + g_return_val_if_fail (GST_IS_QUERY (query), FALSE); + + GstInterPipeINodeInterface *iface = GST_INTER_PIPE_INODE_GET_IFACE (self); + g_return_val_if_fail (iface->query != NULL, FALSE); + return iface->query (self, query); +} diff --git a/gst/interpipe/gstinterpipeinode.h b/gst/interpipe/gstinterpipeinode.h index 5aa16ec..98a1bd4 100644 --- a/gst/interpipe/gstinterpipeinode.h +++ b/gst/interpipe/gstinterpipeinode.h @@ -65,6 +65,7 @@ struct _GstInterPipeINodeInterface gboolean (* add_listener) (GstInterPipeINode *iface, GstInterPipeIListener * listener); gboolean (* remove_listener) (GstInterPipeINode *iface, GstInterPipeIListener * listener); gboolean (* receive_event) (GstInterPipeINode *iface, GstEvent *event); + gboolean (* query) (GstInterPipeINode *iface, GstQuery *query); }; /** @@ -106,6 +107,8 @@ gboolean gst_inter_pipe_inode_receive_event (GstInterPipeINode *iface, GstEvent GType gst_inter_pipe_inode_get_type (void); +gboolean gst_inter_pipe_inode_send_query (GstInterPipeINode *iface, GstQuery *query); + G_END_DECLS #endif //__GST_INTER_PIPE_INODE_H__ diff --git a/gst/interpipe/gstinterpipesink.c b/gst/interpipe/gstinterpipesink.c index 9524c6f..9217441 100644 --- a/gst/interpipe/gstinterpipesink.c +++ b/gst/interpipe/gstinterpipesink.c @@ -92,6 +92,8 @@ static void gst_inter_pipe_sink_intersect_listener_caps (gpointer key, gpointer value, gpointer user_data); static void gst_inter_pipe_sink_forward_event (gpointer key, gpointer value, gpointer user_data); +static gboolean gst_inter_pipe_sink_query (GstInterPipeINode * iface, + GstQuery * query); static void gst_inter_pipe_inode_init (GstInterPipeINodeInterface * iface); @@ -135,7 +137,7 @@ G_DEFINE_TYPE_WITH_CODE (GstInterPipeSink, gst_inter_pipe_sink, gst_inter_pipe_inode_init)); static void -gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass) +gst_inter_pipe_sink_class_init (GstInterPipeSinkClass *klass) { GObjectClass *gobject_class; GstElementClass *element_class; @@ -181,8 +183,7 @@ gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass) } static void -gst_inter_pipe_sink_update_node_name (GstInterPipeSink * sink, - GParamSpec * pspec) +gst_inter_pipe_sink_update_node_name (GstInterPipeSink *sink, GParamSpec *pspec) { GstInterPipeINode *node; @@ -198,7 +199,7 @@ gst_inter_pipe_sink_update_node_name (GstInterPipeSink * sink, } static void -gst_inter_pipe_sink_init (GstInterPipeSink * sink) +gst_inter_pipe_sink_init (GstInterPipeSink *sink) { GstAppSinkCallbacks callbacks; @@ -238,8 +239,8 @@ gst_inter_pipe_sink_init (GstInterPipeSink * sink) static void -gst_inter_pipe_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) +gst_inter_pipe_sink_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) { GstInterPipeSink *sink; @@ -261,8 +262,8 @@ gst_inter_pipe_sink_set_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) +gst_inter_pipe_sink_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) { GstInterPipeSink *sink; GHashTable *listeners; @@ -285,7 +286,7 @@ gst_inter_pipe_sink_get_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_sink_finalize (GObject * object) +gst_inter_pipe_sink_finalize (GObject *object) { GstInterPipeSink *sink; GstInterPipeINode *node; @@ -340,8 +341,8 @@ gst_inter_pipe_sink_update_listener_caps (gpointer key, gpointer data, static gboolean -gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, - GstCaps * listener_caps, GstCaps * sinkcaps) +gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink *sink, + GstCaps *listener_caps, GstCaps *sinkcaps) { GstCaps *renegotiated_caps = NULL; @@ -359,7 +360,7 @@ gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, } static GstCaps * -gst_inter_pipe_sink_caps_intersect (GstCaps * caps1, GstCaps * caps2) +gst_inter_pipe_sink_caps_intersect (GstCaps *caps1, GstCaps *caps2) { if (!caps1 && !caps2) { return NULL; @@ -409,7 +410,7 @@ gst_inter_pipe_sink_intersect_listener_caps (gpointer key, gpointer value, } static GstCaps * -gst_inter_pipe_sink_get_caps (GstBaseSink * base, GstCaps * filter) +gst_inter_pipe_sink_get_caps (GstBaseSink *base, GstCaps *filter) { GstInterPipeSink *sink; GstInterPipeIListener *listener; @@ -447,11 +448,9 @@ gst_inter_pipe_sink_get_caps (GstBaseSink * base, GstCaps * filter) /* Take into account upsream caps suggestion */ pre_filter = sink->caps_negotiated; - intercept_caps = - gst_inter_pipe_sink_caps_intersect (pre_filter, filter); + intercept_caps = gst_inter_pipe_sink_caps_intersect (pre_filter, filter); - GST_INFO_OBJECT (sink, "Filtered caps: %" GST_PTR_FORMAT, - intercept_caps); + GST_INFO_OBJECT (sink, "Filtered caps: %" GST_PTR_FORMAT, intercept_caps); if (!intercept_caps || gst_caps_is_empty (intercept_caps)) { GST_ERROR_OBJECT (sink, @@ -488,7 +487,7 @@ gst_inter_pipe_sink_get_caps (GstBaseSink * base, GstCaps * filter) } static gboolean -gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * caps) +gst_inter_pipe_sink_set_caps (GstBaseSink *base, GstCaps *caps) { GstInterPipeSink *sink; GHashTable *listeners; @@ -528,7 +527,7 @@ gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * caps) g_mutex_unlock (&sink->listeners_mutex); - out: +out: if (ret) { gst_caps_replace (&sink->caps, caps); gst_app_sink_set_caps (GST_APP_SINK (sink), caps); @@ -579,7 +578,7 @@ gst_inter_pipe_sink_forward_event (gpointer key, gpointer data, } static gboolean -gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event) +gst_inter_pipe_sink_event (GstBaseSink *base, GstEvent *event) { GstInterPipeSink *sink; GHashTable *listeners; @@ -747,7 +746,7 @@ gst_inter_pipe_sink_forward_query_allocation (gpointer key, gpointer data, } static gboolean -gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, GstQuery * query) +gst_inter_pipe_sink_propose_allocation (GstBaseSink *base, GstQuery *query) { struct AllocQueryCtx ctx = { 0 }; GstInterPipeSink *sink; @@ -848,7 +847,7 @@ gst_inter_pipe_sink_push_to_listener (gpointer key, gpointer data, } static void -gst_inter_pipe_sink_process_sample (GstInterPipeSink * sink, GstSample * sample) +gst_inter_pipe_sink_process_sample (GstInterPipeSink *sink, GstSample *sample) { GHashTable *listeners; GstBuffer *buffer; @@ -875,7 +874,7 @@ gst_inter_pipe_sink_process_sample (GstInterPipeSink * sink, GstSample * sample) } static GstFlowReturn -gst_inter_pipe_sink_new_buffer (GstAppSink * asink, gpointer data) +gst_inter_pipe_sink_new_buffer (GstAppSink *asink, gpointer data) { GstInterPipeSink *sink; GstSample *sample; @@ -890,7 +889,7 @@ gst_inter_pipe_sink_new_buffer (GstAppSink * asink, gpointer data) static GstFlowReturn -gst_inter_pipe_sink_new_preroll (GstAppSink * asink, gpointer data) +gst_inter_pipe_sink_new_preroll (GstAppSink *asink, gpointer data) { GstInterPipeSink *sink; GstSample *sample; @@ -920,7 +919,7 @@ gst_inter_pipe_sink_send_eos (gpointer key, gpointer data, gpointer user_data) } static void -gst_inter_pipe_sink_eos (GstAppSink * asink, gpointer data) +gst_inter_pipe_sink_eos (GstAppSink *asink, gpointer data) { GstInterPipeSink *sink; GHashTable *listeners; @@ -943,16 +942,17 @@ gst_inter_pipe_sink_eos (GstAppSink * asink, gpointer data) /* GstInterPipeINode interface implementation */ static void -gst_inter_pipe_inode_init (GstInterPipeINodeInterface * iface) +gst_inter_pipe_inode_init (GstInterPipeINodeInterface *iface) { iface->add_listener = gst_inter_pipe_sink_add_listener; iface->remove_listener = gst_inter_pipe_sink_remove_listener; iface->receive_event = gst_inter_pipe_sink_receive_event; + iface->query = gst_inter_pipe_sink_query; } static gboolean -gst_inter_pipe_sink_add_listener (GstInterPipeINode * iface, - GstInterPipeIListener * listener) +gst_inter_pipe_sink_add_listener (GstInterPipeINode *iface, + GstInterPipeIListener *listener) { GstInterPipeSink *sink; GHashTable *listeners; @@ -1070,8 +1070,8 @@ gst_inter_pipe_sink_add_listener (GstInterPipeINode * iface, } static gboolean -gst_inter_pipe_sink_remove_listener (GstInterPipeINode * iface, - GstInterPipeIListener * listener) +gst_inter_pipe_sink_remove_listener (GstInterPipeINode *iface, + GstInterPipeIListener *listener) { GstInterPipeSink *sink; GHashTable *listeners; @@ -1106,7 +1106,7 @@ gst_inter_pipe_sink_remove_listener (GstInterPipeINode * iface, } static gboolean -gst_inter_pipe_sink_receive_event (GstInterPipeINode * iface, GstEvent * event) +gst_inter_pipe_sink_receive_event (GstInterPipeINode *iface, GstEvent *event) { GstInterPipeSink *self; GHashTable *listeners; @@ -1130,3 +1130,16 @@ gst_inter_pipe_sink_receive_event (GstInterPipeINode * iface, GstEvent * event) return FALSE; } } + +static gboolean +gst_inter_pipe_sink_query (GstInterPipeINode *iface, GstQuery *query) +{ + GstInterPipeSink *self = GST_INTER_PIPE_SINK (iface); + gboolean res; + + GST_DEBUG_OBJECT (self, "query: %" GST_PTR_FORMAT, query); + + res = gst_pad_peer_query (GST_INTER_PIPE_SINK_PAD (self), query); + return res; + +} diff --git a/gst/interpipe/gstinterpipesrc.c b/gst/interpipe/gstinterpipesrc.c index 143977c..29d0276 100644 --- a/gst/interpipe/gstinterpipesrc.c +++ b/gst/interpipe/gstinterpipesrc.c @@ -45,6 +45,9 @@ #include #include "gstinterpipe.h" #include "gstinterpipesrc.h" + +#include + #include "gstinterpipeilistener.h" GST_DEBUG_CATEGORY_STATIC (gst_inter_pipe_src_debug); @@ -94,6 +97,7 @@ static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, static gboolean gst_inter_pipe_src_start (GstBaseSrc * base); static gboolean gst_inter_pipe_src_stop (GstBaseSrc * base); static gboolean gst_inter_pipe_src_event (GstBaseSrc * base, GstEvent * event); +static gboolean gst_inter_pipe_src_query (GstBaseSrc * base, GstQuery * query); static void gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface); @@ -156,6 +160,10 @@ struct _GstInterPipeSrc /* Accept end of stream event */ gboolean accept_eos_event; + + /* Pending context queries that couldn't be answered */ + GQueue *pending_context_queries; + GMutex context_lock; }; struct _GstInterPipeSrcClass @@ -168,7 +176,7 @@ G_DEFINE_TYPE_WITH_CODE (GstInterPipeSrc, gst_inter_pipe_src, GST_TYPE_APP_SRC, gst_inter_pipe_ilistener_init)); static void -gst_inter_pipe_src_class_init (GstInterPipeSrcClass * klass) +gst_inter_pipe_src_class_init (GstInterPipeSrcClass *klass) { GObjectClass *gobject_class; GstElementClass *element_class; @@ -228,10 +236,11 @@ gst_inter_pipe_src_class_init (GstInterPipeSrcClass * klass) basesrc_class->stop = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_stop); basesrc_class->event = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_event); basesrc_class->create = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_create); + basesrc_class->query = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_query); } static void -gst_inter_pipe_src_init (GstInterPipeSrc * src) +gst_inter_pipe_src_init (GstInterPipeSrc *src) { gst_app_src_set_emit_signals (GST_APP_SRC (src), FALSE); @@ -244,11 +253,66 @@ gst_inter_pipe_src_init (GstInterPipeSrc * src) src->stream_sync = GST_INTER_PIPE_SRC_PASSTHROUGH_TIMESTAMP; src->accept_events = TRUE; src->accept_eos_event = TRUE; + + g_mutex_init (&src->context_lock); + src->pending_context_queries = g_queue_new (); } static void -gst_inter_pipe_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) +gst_inter_pipe_src_retry_pending_queries (GstInterPipeSrc *src) +{ + GstInterPipeINode *sink_node = gst_inter_pipe_get_node (src->listen_to); + GstQuery *query; + GstContext *context; + + if (sink_node) { + g_mutex_lock (&src->context_lock); + + while (!g_queue_is_empty (src->pending_context_queries)) { + query = g_queue_pop_head (src->pending_context_queries); + + GST_DEBUG_OBJECT (src, "Retrying pending context query"); + + if (!gst_inter_pipe_inode_send_query (sink_node, query)) { + GST_DEBUG_OBJECT (src, "Pending query still cannot be answered"); + } else { + GST_DEBUG_OBJECT (src, + "Pending query answered, we've got a new context from upstream!"); + // TODO: this doesn't work, it's too late to set the context now + // see for example: https://github.com/GStreamer/gstreamer/blob/2dd1b4bf270995fd4dd6aa240cb51cfcdf3c71b8/subprojects/gst-plugins-bad/sys/nvcodec/gstcudabasetransform.c#L179-L197 + // by the time we set this context filter->stream has already be assigned, so changing the context doesn't do anything.. + gst_query_parse_context (query, &context); + if (context) { + GstElement *pipeline = GST_ELEMENT (gst_element_get_parent (src)); + GstIterator *iter = gst_bin_iterate_elements (GST_BIN (pipeline)); + GValue item = G_VALUE_INIT; + GstMessage *msg; + + // Set the context on interpipesrc itself + gst_element_set_context (GST_ELEMENT (src), context); + + GST_DEBUG_OBJECT (src, "Sending have context message"); + msg = gst_message_new_have_context (GST_OBJECT (src), context); + gst_element_post_message (GST_ELEMENT (src), msg); + + while (gst_iterator_next (iter, &item) == GST_ITERATOR_OK) { + GstElement *element = GST_ELEMENT (g_value_get_object (&item)); + gst_element_set_context (element, context); + } + + } + } + + gst_query_unref (query); + } + + g_mutex_unlock (&src->context_lock); + } +} + +static void +gst_inter_pipe_src_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) { GstInterPipeSrc *src; GstInterPipeIListener *listener; @@ -277,6 +341,9 @@ gst_inter_pipe_src_set_property (GObject * object, guint prop_id, g_free (src->listen_to); } src->listen_to = node_name; + + /* Retry any pending context queries */ + gst_inter_pipe_src_retry_pending_queries (src); } src->listening = TRUE; GST_INFO_OBJECT (src, "Listening to node %s", src->listen_to); @@ -323,8 +390,8 @@ gst_inter_pipe_src_set_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) +gst_inter_pipe_src_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) { GstInterPipeSrc *src; @@ -358,7 +425,7 @@ gst_inter_pipe_src_get_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_src_finalize (GObject * object) +gst_inter_pipe_src_finalize (GObject *object) { GstInterPipeSrc *src; @@ -368,6 +435,14 @@ gst_inter_pipe_src_finalize (GObject * object) g_queue_free_full (src->pending_serial_events, (GDestroyNotify) gst_event_unref); + /* Free pending context queries */ + g_mutex_lock (&src->context_lock); + g_queue_free_full (src->pending_context_queries, + (GDestroyNotify) gst_query_unref); + g_mutex_unlock (&src->context_lock); + + g_mutex_clear (&src->context_lock); + if (src->listen_to) { g_free (src->listen_to); src->listen_to = NULL; @@ -379,7 +454,7 @@ gst_inter_pipe_src_finalize (GObject * object) /* GstBaseSrc Implementation*/ static gboolean -gst_inter_pipe_src_start (GstBaseSrc * base) +gst_inter_pipe_src_start (GstBaseSrc *base) { GstBaseSrcClass *basesrc_class; GstInterPipeSrc *src; @@ -397,6 +472,10 @@ gst_inter_pipe_src_start (GstBaseSrc * base) } else { GST_INFO_OBJECT (src, "Listening to node %s", src->listen_to); src->listening = TRUE; + + /* Retry pending queries */ + gst_inter_pipe_src_retry_pending_queries (src); + goto start_done; } } else { @@ -414,7 +493,7 @@ gst_inter_pipe_src_start (GstBaseSrc * base) } static gboolean -gst_inter_pipe_src_stop (GstBaseSrc * base) +gst_inter_pipe_src_stop (GstBaseSrc *base) { GstBaseSrcClass *basesrc_class; GstInterPipeSrc *src; @@ -434,7 +513,7 @@ gst_inter_pipe_src_stop (GstBaseSrc * base) } static gboolean -gst_inter_pipe_src_event (GstBaseSrc * base, GstEvent * event) +gst_inter_pipe_src_event (GstBaseSrc *base, GstEvent *event) { GstBaseSrcClass *basesrc_class; GstInterPipeSrc *src; @@ -458,9 +537,48 @@ gst_inter_pipe_src_event (GstBaseSrc * base, GstEvent * event) return basesrc_class->event (base, event); } +static gboolean +gst_inter_pipe_src_query (GstBaseSrc *base, GstQuery *query) +{ + GstInterPipeINode *sink_node; + GstBaseSrcClass *basesrc_class; + GstInterPipeSrc *src; + gboolean ret; + + basesrc_class = GST_BASE_SRC_CLASS (gst_inter_pipe_src_parent_class); + src = GST_INTER_PIPE_SRC (base); + + GST_DEBUG_OBJECT (src, "Incoming upstream query %s", + GST_QUERY_TYPE_NAME (query)); + + if (GST_QUERY_TYPE (query) == GST_QUERY_CONTEXT) { + if (src->listening && src->listen_to) { + GST_DEBUG_OBJECT (src, "Context query will be forwarded to node %s", + src->listen_to); + sink_node = gst_inter_pipe_get_node (src->listen_to); + ret = gst_inter_pipe_inode_send_query (sink_node, query); + + if (!ret) { + GST_DEBUG_OBJECT (src, "Context query could not be answered by node"); + } + + return ret; + } else { + GST_DEBUG_OBJECT (src, "Not connected yet, cannot answer context query"); + + /* Cache this query type for retry after connection */ + g_mutex_lock (&src->context_lock); + g_queue_push_tail (src->pending_context_queries, gst_query_ref (query)); + g_mutex_unlock (&src->context_lock); + } + } + + return basesrc_class->query (base, query); +} + static GstFlowReturn -gst_inter_pipe_src_create (GstBaseSrc * base, guint64 offset, guint size, - GstBuffer ** buf) +gst_inter_pipe_src_create (GstBaseSrc *base, guint64 offset, guint size, + GstBuffer **buf) { GstInterPipeSrc *src; GstEvent *serial_event; @@ -514,7 +632,7 @@ gst_inter_pipe_src_create (GstBaseSrc * base, guint64 offset, guint size, /* GstInterPipeIListener Implementation */ static void -gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface) +gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface *iface) { iface->get_name = gst_inter_pipe_src_get_name; iface->node_added = gst_inter_pipe_src_node_added; @@ -528,14 +646,14 @@ gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface) } static const gchar * -gst_inter_pipe_src_get_name (GstInterPipeIListener * iface) +gst_inter_pipe_src_get_name (GstInterPipeIListener *iface) { return GST_OBJECT_NAME (iface); } static gboolean -gst_inter_pipe_src_node_added (GstInterPipeIListener * iface, - const gchar * node_name) +gst_inter_pipe_src_node_added (GstInterPipeIListener *iface, + const gchar *node_name) { GstInterPipeSrc *src; @@ -545,14 +663,17 @@ gst_inter_pipe_src_node_added (GstInterPipeIListener * iface, if (g_strcmp0 (src->listen_to, node_name) == 0) { gst_inter_pipe_src_listen_node (src, node_name); + + /* Retry pending queries */ + gst_inter_pipe_src_retry_pending_queries (src); } return TRUE; } static gboolean -gst_inter_pipe_src_node_removed (GstInterPipeIListener * iface, - const gchar * node_name) +gst_inter_pipe_src_node_removed (GstInterPipeIListener *iface, + const gchar *node_name) { GstInterPipeSrc *src; @@ -567,8 +688,7 @@ gst_inter_pipe_src_node_removed (GstInterPipeIListener * iface, } static GstCaps * -gst_inter_pipe_src_get_caps (GstInterPipeIListener * iface, - gboolean * negotiated) +gst_inter_pipe_src_get_caps (GstInterPipeIListener *iface, gboolean *negotiated) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -593,8 +713,7 @@ gst_inter_pipe_src_get_caps (GstInterPipeIListener * iface, } static gboolean -gst_inter_pipe_src_set_caps (GstInterPipeIListener * iface, - const GstCaps * caps) +gst_inter_pipe_src_set_caps (GstInterPipeIListener *iface, const GstCaps *caps) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -624,8 +743,8 @@ gst_inter_pipe_src_set_caps (GstInterPipeIListener * iface, } static gboolean -gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, - GstBuffer * buffer, guint64 basetime) +gst_inter_pipe_src_push_buffer (GstInterPipeIListener *iface, + GstBuffer *buffer, guint64 basetime) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -661,7 +780,7 @@ gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, difftime = srcbasetime - basetime; if (GST_BUFFER_PTS (buffer) >= difftime) { GST_BUFFER_PTS (buffer) = GST_BUFFER_PTS (buffer) - difftime; - if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE ) { + if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE) { GST_BUFFER_DTS (buffer) = GST_BUFFER_DTS (buffer) - difftime; } } else { @@ -671,7 +790,7 @@ gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, } else { difftime = basetime - srcbasetime; GST_BUFFER_PTS (buffer) = GST_BUFFER_PTS (buffer) + difftime; - if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE ) { + if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE) { GST_BUFFER_DTS (buffer) = GST_BUFFER_DTS (buffer) + difftime; } } @@ -707,7 +826,7 @@ gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, } static gboolean -gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, +gst_inter_pipe_src_push_event (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime) { GstInterPipeSrc *src; @@ -761,7 +880,7 @@ gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, static gboolean -gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) +gst_inter_pipe_src_send_eos (GstInterPipeIListener *iface) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -781,7 +900,7 @@ gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) static gboolean -gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, GstQuery * query) +gst_inter_pipe_src_push_query (GstInterPipeIListener *iface, GstQuery *query) { GstInterPipeSrc *src; GstPad *srcpad; @@ -807,7 +926,7 @@ gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, GstQuery * query) static gboolean -gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name) +gst_inter_pipe_src_listen_node (GstInterPipeSrc *src, const gchar *node_name) { GstInterPipeIListener *listener; diff --git a/meson_options.txt b/meson_options.txt index 2c097c1..1a235bc 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -1,6 +1,6 @@ # Feature options option('tests', type : 'feature', value : 'auto', yield : true, description : 'Enable tests') -option('enable-gtk-doc', type : 'boolean', value : true, description : 'Use gtk-doc to build documentation') +option('enable-gtk-doc', type : 'boolean', value : false, description : 'Use gtk-doc to build documentation') # Common options option('package-name', type : 'string', yield : true,