diff --git a/.gitignore b/.gitignore index 82bf8b9..6bc0212 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,5 @@ docs/plugins/sgml.stamp docs/plugins/xml/ docs/version.entities docs/plugins/inspect-registry.xml + +.idea/ diff --git a/common b/common index dd9d403..52adcdb 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit dd9d4031075713cf37c656ce639b6d60d6f9dde3 +Subproject commit 52adcdb89a9eb527df38c569539d95c1c7aeda6e diff --git a/gst/interpipe/gstinterpipeilistener.c b/gst/interpipe/gstinterpipeilistener.c index 6908c8b..6266cb9 100644 --- a/gst/interpipe/gstinterpipeilistener.c +++ b/gst/interpipe/gstinterpipeilistener.c @@ -142,6 +142,21 @@ gst_inter_pipe_ilistener_push_event (GstInterPipeIListener * self, return iface->push_event (self, event, basetime); } +gboolean +gst_inter_pipe_ilistener_query (GstInterPipeIListener * self, GstQuery * query) +{ + GstInterPipeIListenerInterface *iface; + + g_return_val_if_fail (GST_INTER_PIPE_IS_ILISTENER (self), FALSE); + g_return_val_if_fail (query, FALSE); + + iface = GST_INTER_PIPE_ILISTENER_GET_IFACE (self); + g_return_val_if_fail (iface->query != NULL, FALSE); + + return iface->query (self, query); +} + + gboolean gst_inter_pipe_ilistener_send_eos (GstInterPipeIListener * self) { diff --git a/gst/interpipe/gstinterpipeilistener.h b/gst/interpipe/gstinterpipeilistener.h index 577cfb8..4c210a3 100644 --- a/gst/interpipe/gstinterpipeilistener.h +++ b/gst/interpipe/gstinterpipeilistener.h @@ -94,6 +94,7 @@ struct _GstInterPipeIListenerInterface gboolean (* node_removed) (GstInterPipeIListener *iface, const gchar *node_removed); gboolean (* push_buffer) (GstInterPipeIListener *iface, GstBuffer *buffer, guint64 basetime); gboolean (* push_event) (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + gboolean (* query) (GstInterPipeIListener *iface, GstQuery *query); gboolean (* send_eos) (GstInterPipeIListener *iface); }; @@ -193,6 +194,19 @@ gboolean gst_inter_pipe_ilistener_push_buffer (GstInterPipeIListener *iface, gboolean gst_inter_pipe_ilistener_push_event (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + +/** + * gst_inter_pipe_ilistener_query: + * @iface: (transfer none)(not nullable): The object that should query the #GstQuery downstream. + * @event: (transfer full)(not nullable): The #GstQuery to be pushed downstream. + * + * Ask @query to the downstream element. + * + * Return: True if the query was successful, False otherwise. + */ +gboolean gst_inter_pipe_ilistener_query (GstInterPipeIListener *iface, + GstQuery *query); + /** * gst_inter_pipe_ilistener_send_eos: * @iface: (transfer none)(not nullable): The object that should push the #GstEvent downstream. diff --git a/gst/interpipe/gstinterpipeinode.c b/gst/interpipe/gstinterpipeinode.c index 93eb4c1..92a7ec9 100644 --- a/gst/interpipe/gstinterpipeinode.c +++ b/gst/interpipe/gstinterpipeinode.c @@ -34,14 +34,14 @@ G_DEFINE_INTERFACE (GstInterPipeINode, gst_inter_pipe_inode, G_TYPE_OBJECT); static void -gst_inter_pipe_inode_default_init (GstInterPipeINodeInterface * iface) +gst_inter_pipe_inode_default_init (GstInterPipeINodeInterface *iface) { //NOP } gboolean -gst_inter_pipe_inode_add_listener (GstInterPipeINode * self, - GstInterPipeIListener * listener) +gst_inter_pipe_inode_add_listener (GstInterPipeINode *self, + GstInterPipeIListener *listener) { GstInterPipeINodeInterface *iface; @@ -54,8 +54,8 @@ gst_inter_pipe_inode_add_listener (GstInterPipeINode * self, } gboolean -gst_inter_pipe_inode_remove_listener (GstInterPipeINode * self, - GstInterPipeIListener * listener) +gst_inter_pipe_inode_remove_listener (GstInterPipeINode *self, + GstInterPipeIListener *listener) { GstInterPipeINodeInterface *iface; @@ -68,7 +68,7 @@ gst_inter_pipe_inode_remove_listener (GstInterPipeINode * self, } gboolean -gst_inter_pipe_inode_receive_event (GstInterPipeINode * self, GstEvent * event) +gst_inter_pipe_inode_receive_event (GstInterPipeINode *self, GstEvent *event) { GstInterPipeINodeInterface *iface; @@ -79,3 +79,14 @@ gst_inter_pipe_inode_receive_event (GstInterPipeINode * self, GstEvent * event) return iface->receive_event (self, event); } + +gboolean +gst_inter_pipe_inode_send_query (GstInterPipeINode *self, GstQuery *query) +{ + g_return_val_if_fail (GST_INTER_PIPE_IS_INODE (self), FALSE); + g_return_val_if_fail (GST_IS_QUERY (query), FALSE); + + GstInterPipeINodeInterface *iface = GST_INTER_PIPE_INODE_GET_IFACE (self); + g_return_val_if_fail (iface->query != NULL, FALSE); + return iface->query (self, query); +} diff --git a/gst/interpipe/gstinterpipeinode.h b/gst/interpipe/gstinterpipeinode.h index 5aa16ec..98a1bd4 100644 --- a/gst/interpipe/gstinterpipeinode.h +++ b/gst/interpipe/gstinterpipeinode.h @@ -65,6 +65,7 @@ struct _GstInterPipeINodeInterface gboolean (* add_listener) (GstInterPipeINode *iface, GstInterPipeIListener * listener); gboolean (* remove_listener) (GstInterPipeINode *iface, GstInterPipeIListener * listener); gboolean (* receive_event) (GstInterPipeINode *iface, GstEvent *event); + gboolean (* query) (GstInterPipeINode *iface, GstQuery *query); }; /** @@ -106,6 +107,8 @@ gboolean gst_inter_pipe_inode_receive_event (GstInterPipeINode *iface, GstEvent GType gst_inter_pipe_inode_get_type (void); +gboolean gst_inter_pipe_inode_send_query (GstInterPipeINode *iface, GstQuery *query); + G_END_DECLS #endif //__GST_INTER_PIPE_INODE_H__ diff --git a/gst/interpipe/gstinterpipesink.c b/gst/interpipe/gstinterpipesink.c index b757a4b..9217441 100644 --- a/gst/interpipe/gstinterpipesink.c +++ b/gst/interpipe/gstinterpipesink.c @@ -82,6 +82,8 @@ static gboolean gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * filter); static gboolean gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event); +static gboolean gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, + GstQuery * query); static gboolean gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, GstCaps * listener_caps, GstCaps * sinkcaps); static GstCaps *gst_inter_pipe_sink_caps_intersect (GstCaps * caps1, @@ -90,6 +92,8 @@ static void gst_inter_pipe_sink_intersect_listener_caps (gpointer key, gpointer value, gpointer user_data); static void gst_inter_pipe_sink_forward_event (gpointer key, gpointer value, gpointer user_data); +static gboolean gst_inter_pipe_sink_query (GstInterPipeINode * iface, + GstQuery * query); static void gst_inter_pipe_inode_init (GstInterPipeINodeInterface * iface); @@ -133,7 +137,7 @@ G_DEFINE_TYPE_WITH_CODE (GstInterPipeSink, gst_inter_pipe_sink, gst_inter_pipe_inode_init)); static void -gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass) +gst_inter_pipe_sink_class_init (GstInterPipeSinkClass *klass) { GObjectClass *gobject_class; GstElementClass *element_class; @@ -174,12 +178,12 @@ gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass) basesink_class->get_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_get_caps); basesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_set_caps); basesink_class->event = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_event); - + basesink_class->propose_allocation = + GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_propose_allocation); } static void -gst_inter_pipe_sink_update_node_name (GstInterPipeSink * sink, - GParamSpec * pspec) +gst_inter_pipe_sink_update_node_name (GstInterPipeSink *sink, GParamSpec *pspec) { GstInterPipeINode *node; @@ -195,7 +199,7 @@ gst_inter_pipe_sink_update_node_name (GstInterPipeSink * sink, } static void -gst_inter_pipe_sink_init (GstInterPipeSink * sink) +gst_inter_pipe_sink_init (GstInterPipeSink *sink) { GstAppSinkCallbacks callbacks; @@ -235,8 +239,8 @@ gst_inter_pipe_sink_init (GstInterPipeSink * sink) static void -gst_inter_pipe_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) +gst_inter_pipe_sink_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) { GstInterPipeSink *sink; @@ -258,8 +262,8 @@ gst_inter_pipe_sink_set_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) +gst_inter_pipe_sink_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) { GstInterPipeSink *sink; GHashTable *listeners; @@ -282,7 +286,7 @@ gst_inter_pipe_sink_get_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_sink_finalize (GObject * object) +gst_inter_pipe_sink_finalize (GObject *object) { GstInterPipeSink *sink; GstInterPipeINode *node; @@ -337,8 +341,8 @@ gst_inter_pipe_sink_update_listener_caps (gpointer key, gpointer data, static gboolean -gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, - GstCaps * listener_caps, GstCaps * sinkcaps) +gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink *sink, + GstCaps *listener_caps, GstCaps *sinkcaps) { GstCaps *renegotiated_caps = NULL; @@ -356,7 +360,7 @@ gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, } static GstCaps * -gst_inter_pipe_sink_caps_intersect (GstCaps * caps1, GstCaps * caps2) +gst_inter_pipe_sink_caps_intersect (GstCaps *caps1, GstCaps *caps2) { if (!caps1 && !caps2) { return NULL; @@ -406,7 +410,7 @@ gst_inter_pipe_sink_intersect_listener_caps (gpointer key, gpointer value, } static GstCaps * -gst_inter_pipe_sink_get_caps (GstBaseSink * base, GstCaps * filter) +gst_inter_pipe_sink_get_caps (GstBaseSink *base, GstCaps *filter) { GstInterPipeSink *sink; GstInterPipeIListener *listener; @@ -444,11 +448,9 @@ gst_inter_pipe_sink_get_caps (GstBaseSink * base, GstCaps * filter) /* Take into account upsream caps suggestion */ pre_filter = sink->caps_negotiated; - intercept_caps = - gst_inter_pipe_sink_caps_intersect (pre_filter, filter); + intercept_caps = gst_inter_pipe_sink_caps_intersect (pre_filter, filter); - GST_INFO_OBJECT (sink, "Filtered caps: %" GST_PTR_FORMAT, - intercept_caps); + GST_INFO_OBJECT (sink, "Filtered caps: %" GST_PTR_FORMAT, intercept_caps); if (!intercept_caps || gst_caps_is_empty (intercept_caps)) { GST_ERROR_OBJECT (sink, @@ -485,7 +487,7 @@ gst_inter_pipe_sink_get_caps (GstBaseSink * base, GstCaps * filter) } static gboolean -gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * caps) +gst_inter_pipe_sink_set_caps (GstBaseSink *base, GstCaps *caps) { GstInterPipeSink *sink; GHashTable *listeners; @@ -525,7 +527,7 @@ gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * caps) g_mutex_unlock (&sink->listeners_mutex); - out: +out: if (ret) { gst_caps_replace (&sink->caps, caps); gst_app_sink_set_caps (GST_APP_SINK (sink), caps); @@ -576,7 +578,7 @@ gst_inter_pipe_sink_forward_event (gpointer key, gpointer data, } static gboolean -gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event) +gst_inter_pipe_sink_event (GstBaseSink *base, GstEvent *event) { GstInterPipeSink *sink; GHashTable *listeners; @@ -598,6 +600,228 @@ gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event) event); } + +struct AllocQueryCtx +{ + GstInterPipeSink *sink; + GstQuery *query; + GstAllocationParams params; + guint size; + guint min_buffers; + gboolean first_query; + guint num_listeners; +}; + + +static gboolean +gst_inter_pipe_sink_forward_query_allocation (gpointer key, gpointer data, + gpointer user_data) +{ + struct AllocQueryCtx *ctx; + GstInterPipeIListener *listener; + gchar *listener_name; + GstInterPipeSink *sink; + GstQuery *query; + GstCaps *caps; + gboolean ret = TRUE; + guint count, i, size, min; + + listener = GST_INTER_PIPE_ILISTENER (data); + listener_name = (gchar *) key; + ctx = user_data; + sink = ctx->sink; + + GST_DEBUG_OBJECT (sink, "Aggregating allocation from listener %s", + listener_name); + + gst_query_parse_allocation (ctx->query, &caps, NULL); + + query = gst_query_new_allocation (caps, FALSE); + if (!gst_inter_pipe_ilistener_query (listener, query)) { + GST_DEBUG_OBJECT (sink, + "Allocation query failed on listener %s, ignoring allocation", + listener_name); + ret = FALSE; + goto out; + } + + /* Allocation Filter, extract of code from tee element */ + + /* Allocation Params: + * store the maximum alignment, prefix and padding, but ignore the + * allocators and the flags which are tied to downstream allocation*/ + count = gst_query_get_n_allocation_params (query); + for (i = 0; i < count; i++) { + GstAllocationParams params = { 0, }; + + gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms); + + GST_DEBUG_OBJECT (sink, "Aggregating AllocationParams align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, params.align, params.prefix, params.padding); + + if (ctx->params.align < params.align) + ctx->params.align = params.align; + + if (ctx->params.prefix < params.prefix) + ctx->params.prefix = params.prefix; + + if (ctx->params.padding < params.padding) + ctx->params.padding = params.padding; + } + + /* Allocation Pool: + * We want to keep the biggest size and biggest minimum number of buffers to + * make sure downstream requirement can be satisfied. We don't really care + * about the maximum, as this is a parameter of the downstream provided + * pool. We only read the first allocation pool as the minimum number of + * buffers is normally constant regardless of the pool being used. */ + if (gst_query_get_n_allocation_pools (query) > 0) { + gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL); + + GST_DEBUG_OBJECT (sink, + "Aggregating allocation pool size=%u min_buffers=%u", size, min); + + if (ctx->size < size) + ctx->size = size; + + if (ctx->min_buffers < min) + ctx->min_buffers = min; + } + + /* Allocation Meta: + * For allocation meta, we'll need to aggregate the argument using the new + * GstMetaInfo::agggregate_func */ + count = gst_query_get_n_allocation_metas (query); + for (i = 0; i < count; i++) { + guint ctx_index; + GType api; + const GstStructure *param; + + api = gst_query_parse_nth_allocation_meta (query, i, ¶m); + + /* For the first query, copy all metas */ + if (ctx->first_query) { + gst_query_add_allocation_meta (ctx->query, api, param); + continue; + } + + /* Afterward, aggregate the common params */ + if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) { + const GstStructure *ctx_param; + + gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param); + + /* Keep meta which has no params */ + if (ctx_param == NULL && param == NULL) + continue; + + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, ctx_index); + } + } + + /* Finally, cleanup metas from the stored query that aren't support on this + * listener. */ + count = gst_query_get_n_allocation_metas (ctx->query); + for (i = 0; i < count;) { + GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL); + + if (!gst_query_find_allocation_meta (query, api, NULL)) { + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, i); + count--; + continue; + } + + i++; + } + + ctx->first_query = FALSE; + ctx->num_listeners++; + +out: + gst_query_unref (query); + return ret; +} + +static gboolean +gst_inter_pipe_sink_propose_allocation (GstBaseSink *base, GstQuery *query) +{ + struct AllocQueryCtx ctx = { 0 }; + GstInterPipeSink *sink; + GHashTable *listeners; + GHashTableIter iter; + gboolean ret = TRUE; + gpointer key, value; + + sink = GST_INTER_PIPE_SINK (base); + + g_mutex_lock (&sink->listeners_mutex); + listeners = GST_INTER_PIPE_SINK_LISTENERS (sink); + + ctx.sink = sink; + ctx.query = query; + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + + g_hash_table_iter_init (&iter, listeners); + + while (g_hash_table_iter_next (&iter, &key, &value)) { + ret |= gst_inter_pipe_sink_forward_query_allocation (key, value, &ctx); + } + + if (ret) { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + GST_DEBUG_OBJECT (sink, + "Final allocation parameters: align=%" G_GSIZE_FORMAT " prefix=%" + G_GSIZE_FORMAT " padding %" G_GSIZE_FORMAT, ctx.params.align, + ctx.params.prefix, ctx.params.padding); + + GST_DEBUG_OBJECT (sink, "Final allocation pools: size=%u min_buffers=%u", + ctx.size, ctx.min_buffers); + + GST_DEBUG_OBJECT (sink, "Final %u allocation meta:", count); + + for (i = 0; i < count; i++) { + GST_DEBUG_OBJECT (sink, " + aggregated allocation meta %s", + g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i, + NULL))); + } + + /* Allocate one more buffers when multiplexing so we don't starve the + * downstream threads. */ + if (ctx.num_listeners > 1) + ctx.min_buffers++; + + /* Check that we actually have parameters besides the defaults. */ + if (ctx.params.align || ctx.params.prefix || ctx.params.padding) { + gst_query_add_allocation_param (ctx.query, NULL, &ctx.params); + } + + /* When size == 0, buffers created from this pool would have no memory + * allocated. */ + if (ctx.size) { + gst_query_add_allocation_pool (ctx.query, NULL, ctx.size, + ctx.min_buffers, 0); + } + + } else { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + for (i = 1; i <= count; i++) { + gst_query_remove_nth_allocation_meta (query, count - i); + } + } + + g_mutex_unlock (&sink->listeners_mutex); + + return ret; +} + /* Appsink Callbacks */ static void gst_inter_pipe_sink_push_to_listener (gpointer key, gpointer data, @@ -623,7 +847,7 @@ gst_inter_pipe_sink_push_to_listener (gpointer key, gpointer data, } static void -gst_inter_pipe_sink_process_sample (GstInterPipeSink * sink, GstSample * sample) +gst_inter_pipe_sink_process_sample (GstInterPipeSink *sink, GstSample *sample) { GHashTable *listeners; GstBuffer *buffer; @@ -650,7 +874,7 @@ gst_inter_pipe_sink_process_sample (GstInterPipeSink * sink, GstSample * sample) } static GstFlowReturn -gst_inter_pipe_sink_new_buffer (GstAppSink * asink, gpointer data) +gst_inter_pipe_sink_new_buffer (GstAppSink *asink, gpointer data) { GstInterPipeSink *sink; GstSample *sample; @@ -665,7 +889,7 @@ gst_inter_pipe_sink_new_buffer (GstAppSink * asink, gpointer data) static GstFlowReturn -gst_inter_pipe_sink_new_preroll (GstAppSink * asink, gpointer data) +gst_inter_pipe_sink_new_preroll (GstAppSink *asink, gpointer data) { GstInterPipeSink *sink; GstSample *sample; @@ -695,7 +919,7 @@ gst_inter_pipe_sink_send_eos (gpointer key, gpointer data, gpointer user_data) } static void -gst_inter_pipe_sink_eos (GstAppSink * asink, gpointer data) +gst_inter_pipe_sink_eos (GstAppSink *asink, gpointer data) { GstInterPipeSink *sink; GHashTable *listeners; @@ -718,16 +942,17 @@ gst_inter_pipe_sink_eos (GstAppSink * asink, gpointer data) /* GstInterPipeINode interface implementation */ static void -gst_inter_pipe_inode_init (GstInterPipeINodeInterface * iface) +gst_inter_pipe_inode_init (GstInterPipeINodeInterface *iface) { iface->add_listener = gst_inter_pipe_sink_add_listener; iface->remove_listener = gst_inter_pipe_sink_remove_listener; iface->receive_event = gst_inter_pipe_sink_receive_event; + iface->query = gst_inter_pipe_sink_query; } static gboolean -gst_inter_pipe_sink_add_listener (GstInterPipeINode * iface, - GstInterPipeIListener * listener) +gst_inter_pipe_sink_add_listener (GstInterPipeINode *iface, + GstInterPipeIListener *listener) { GstInterPipeSink *sink; GHashTable *listeners; @@ -845,8 +1070,8 @@ gst_inter_pipe_sink_add_listener (GstInterPipeINode * iface, } static gboolean -gst_inter_pipe_sink_remove_listener (GstInterPipeINode * iface, - GstInterPipeIListener * listener) +gst_inter_pipe_sink_remove_listener (GstInterPipeINode *iface, + GstInterPipeIListener *listener) { GstInterPipeSink *sink; GHashTable *listeners; @@ -881,7 +1106,7 @@ gst_inter_pipe_sink_remove_listener (GstInterPipeINode * iface, } static gboolean -gst_inter_pipe_sink_receive_event (GstInterPipeINode * iface, GstEvent * event) +gst_inter_pipe_sink_receive_event (GstInterPipeINode *iface, GstEvent *event) { GstInterPipeSink *self; GHashTable *listeners; @@ -905,3 +1130,16 @@ gst_inter_pipe_sink_receive_event (GstInterPipeINode * iface, GstEvent * event) return FALSE; } } + +static gboolean +gst_inter_pipe_sink_query (GstInterPipeINode *iface, GstQuery *query) +{ + GstInterPipeSink *self = GST_INTER_PIPE_SINK (iface); + gboolean res; + + GST_DEBUG_OBJECT (self, "query: %" GST_PTR_FORMAT, query); + + res = gst_pad_peer_query (GST_INTER_PIPE_SINK_PAD (self), query); + return res; + +} diff --git a/gst/interpipe/gstinterpipesrc.c b/gst/interpipe/gstinterpipesrc.c index fd2538c..29d0276 100644 --- a/gst/interpipe/gstinterpipesrc.c +++ b/gst/interpipe/gstinterpipesrc.c @@ -45,6 +45,9 @@ #include #include "gstinterpipe.h" #include "gstinterpipesrc.h" + +#include + #include "gstinterpipeilistener.h" GST_DEBUG_CATEGORY_STATIC (gst_inter_pipe_src_debug); @@ -87,11 +90,14 @@ static gboolean gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, static gboolean gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, guint64 basetime); static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface); +static gboolean gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, + GstQuery * query); static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name); static gboolean gst_inter_pipe_src_start (GstBaseSrc * base); static gboolean gst_inter_pipe_src_stop (GstBaseSrc * base); static gboolean gst_inter_pipe_src_event (GstBaseSrc * base, GstEvent * event); +static gboolean gst_inter_pipe_src_query (GstBaseSrc * base, GstQuery * query); static void gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface); @@ -154,6 +160,10 @@ struct _GstInterPipeSrc /* Accept end of stream event */ gboolean accept_eos_event; + + /* Pending context queries that couldn't be answered */ + GQueue *pending_context_queries; + GMutex context_lock; }; struct _GstInterPipeSrcClass @@ -166,7 +176,7 @@ G_DEFINE_TYPE_WITH_CODE (GstInterPipeSrc, gst_inter_pipe_src, GST_TYPE_APP_SRC, gst_inter_pipe_ilistener_init)); static void -gst_inter_pipe_src_class_init (GstInterPipeSrcClass * klass) +gst_inter_pipe_src_class_init (GstInterPipeSrcClass *klass) { GObjectClass *gobject_class; GstElementClass *element_class; @@ -226,10 +236,11 @@ gst_inter_pipe_src_class_init (GstInterPipeSrcClass * klass) basesrc_class->stop = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_stop); basesrc_class->event = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_event); basesrc_class->create = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_create); + basesrc_class->query = GST_DEBUG_FUNCPTR (gst_inter_pipe_src_query); } static void -gst_inter_pipe_src_init (GstInterPipeSrc * src) +gst_inter_pipe_src_init (GstInterPipeSrc *src) { gst_app_src_set_emit_signals (GST_APP_SRC (src), FALSE); @@ -242,11 +253,66 @@ gst_inter_pipe_src_init (GstInterPipeSrc * src) src->stream_sync = GST_INTER_PIPE_SRC_PASSTHROUGH_TIMESTAMP; src->accept_events = TRUE; src->accept_eos_event = TRUE; + + g_mutex_init (&src->context_lock); + src->pending_context_queries = g_queue_new (); } static void -gst_inter_pipe_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) +gst_inter_pipe_src_retry_pending_queries (GstInterPipeSrc *src) +{ + GstInterPipeINode *sink_node = gst_inter_pipe_get_node (src->listen_to); + GstQuery *query; + GstContext *context; + + if (sink_node) { + g_mutex_lock (&src->context_lock); + + while (!g_queue_is_empty (src->pending_context_queries)) { + query = g_queue_pop_head (src->pending_context_queries); + + GST_DEBUG_OBJECT (src, "Retrying pending context query"); + + if (!gst_inter_pipe_inode_send_query (sink_node, query)) { + GST_DEBUG_OBJECT (src, "Pending query still cannot be answered"); + } else { + GST_DEBUG_OBJECT (src, + "Pending query answered, we've got a new context from upstream!"); + // TODO: this doesn't work, it's too late to set the context now + // see for example: https://github.com/GStreamer/gstreamer/blob/2dd1b4bf270995fd4dd6aa240cb51cfcdf3c71b8/subprojects/gst-plugins-bad/sys/nvcodec/gstcudabasetransform.c#L179-L197 + // by the time we set this context filter->stream has already be assigned, so changing the context doesn't do anything.. + gst_query_parse_context (query, &context); + if (context) { + GstElement *pipeline = GST_ELEMENT (gst_element_get_parent (src)); + GstIterator *iter = gst_bin_iterate_elements (GST_BIN (pipeline)); + GValue item = G_VALUE_INIT; + GstMessage *msg; + + // Set the context on interpipesrc itself + gst_element_set_context (GST_ELEMENT (src), context); + + GST_DEBUG_OBJECT (src, "Sending have context message"); + msg = gst_message_new_have_context (GST_OBJECT (src), context); + gst_element_post_message (GST_ELEMENT (src), msg); + + while (gst_iterator_next (iter, &item) == GST_ITERATOR_OK) { + GstElement *element = GST_ELEMENT (g_value_get_object (&item)); + gst_element_set_context (element, context); + } + + } + } + + gst_query_unref (query); + } + + g_mutex_unlock (&src->context_lock); + } +} + +static void +gst_inter_pipe_src_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) { GstInterPipeSrc *src; GstInterPipeIListener *listener; @@ -275,6 +341,9 @@ gst_inter_pipe_src_set_property (GObject * object, guint prop_id, g_free (src->listen_to); } src->listen_to = node_name; + + /* Retry any pending context queries */ + gst_inter_pipe_src_retry_pending_queries (src); } src->listening = TRUE; GST_INFO_OBJECT (src, "Listening to node %s", src->listen_to); @@ -321,8 +390,8 @@ gst_inter_pipe_src_set_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) +gst_inter_pipe_src_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) { GstInterPipeSrc *src; @@ -356,7 +425,7 @@ gst_inter_pipe_src_get_property (GObject * object, guint prop_id, } static void -gst_inter_pipe_src_finalize (GObject * object) +gst_inter_pipe_src_finalize (GObject *object) { GstInterPipeSrc *src; @@ -366,6 +435,14 @@ gst_inter_pipe_src_finalize (GObject * object) g_queue_free_full (src->pending_serial_events, (GDestroyNotify) gst_event_unref); + /* Free pending context queries */ + g_mutex_lock (&src->context_lock); + g_queue_free_full (src->pending_context_queries, + (GDestroyNotify) gst_query_unref); + g_mutex_unlock (&src->context_lock); + + g_mutex_clear (&src->context_lock); + if (src->listen_to) { g_free (src->listen_to); src->listen_to = NULL; @@ -377,7 +454,7 @@ gst_inter_pipe_src_finalize (GObject * object) /* GstBaseSrc Implementation*/ static gboolean -gst_inter_pipe_src_start (GstBaseSrc * base) +gst_inter_pipe_src_start (GstBaseSrc *base) { GstBaseSrcClass *basesrc_class; GstInterPipeSrc *src; @@ -395,6 +472,10 @@ gst_inter_pipe_src_start (GstBaseSrc * base) } else { GST_INFO_OBJECT (src, "Listening to node %s", src->listen_to); src->listening = TRUE; + + /* Retry pending queries */ + gst_inter_pipe_src_retry_pending_queries (src); + goto start_done; } } else { @@ -412,7 +493,7 @@ gst_inter_pipe_src_start (GstBaseSrc * base) } static gboolean -gst_inter_pipe_src_stop (GstBaseSrc * base) +gst_inter_pipe_src_stop (GstBaseSrc *base) { GstBaseSrcClass *basesrc_class; GstInterPipeSrc *src; @@ -432,7 +513,7 @@ gst_inter_pipe_src_stop (GstBaseSrc * base) } static gboolean -gst_inter_pipe_src_event (GstBaseSrc * base, GstEvent * event) +gst_inter_pipe_src_event (GstBaseSrc *base, GstEvent *event) { GstBaseSrcClass *basesrc_class; GstInterPipeSrc *src; @@ -456,9 +537,48 @@ gst_inter_pipe_src_event (GstBaseSrc * base, GstEvent * event) return basesrc_class->event (base, event); } +static gboolean +gst_inter_pipe_src_query (GstBaseSrc *base, GstQuery *query) +{ + GstInterPipeINode *sink_node; + GstBaseSrcClass *basesrc_class; + GstInterPipeSrc *src; + gboolean ret; + + basesrc_class = GST_BASE_SRC_CLASS (gst_inter_pipe_src_parent_class); + src = GST_INTER_PIPE_SRC (base); + + GST_DEBUG_OBJECT (src, "Incoming upstream query %s", + GST_QUERY_TYPE_NAME (query)); + + if (GST_QUERY_TYPE (query) == GST_QUERY_CONTEXT) { + if (src->listening && src->listen_to) { + GST_DEBUG_OBJECT (src, "Context query will be forwarded to node %s", + src->listen_to); + sink_node = gst_inter_pipe_get_node (src->listen_to); + ret = gst_inter_pipe_inode_send_query (sink_node, query); + + if (!ret) { + GST_DEBUG_OBJECT (src, "Context query could not be answered by node"); + } + + return ret; + } else { + GST_DEBUG_OBJECT (src, "Not connected yet, cannot answer context query"); + + /* Cache this query type for retry after connection */ + g_mutex_lock (&src->context_lock); + g_queue_push_tail (src->pending_context_queries, gst_query_ref (query)); + g_mutex_unlock (&src->context_lock); + } + } + + return basesrc_class->query (base, query); +} + static GstFlowReturn -gst_inter_pipe_src_create (GstBaseSrc * base, guint64 offset, guint size, - GstBuffer ** buf) +gst_inter_pipe_src_create (GstBaseSrc *base, guint64 offset, guint size, + GstBuffer **buf) { GstInterPipeSrc *src; GstEvent *serial_event; @@ -512,7 +632,7 @@ gst_inter_pipe_src_create (GstBaseSrc * base, guint64 offset, guint size, /* GstInterPipeIListener Implementation */ static void -gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface) +gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface *iface) { iface->get_name = gst_inter_pipe_src_get_name; iface->node_added = gst_inter_pipe_src_node_added; @@ -521,18 +641,19 @@ gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface) iface->set_caps = gst_inter_pipe_src_set_caps; iface->push_buffer = gst_inter_pipe_src_push_buffer; iface->push_event = gst_inter_pipe_src_push_event; + iface->query = gst_inter_pipe_src_push_query; iface->send_eos = gst_inter_pipe_src_send_eos; } static const gchar * -gst_inter_pipe_src_get_name (GstInterPipeIListener * iface) +gst_inter_pipe_src_get_name (GstInterPipeIListener *iface) { return GST_OBJECT_NAME (iface); } static gboolean -gst_inter_pipe_src_node_added (GstInterPipeIListener * iface, - const gchar * node_name) +gst_inter_pipe_src_node_added (GstInterPipeIListener *iface, + const gchar *node_name) { GstInterPipeSrc *src; @@ -542,14 +663,17 @@ gst_inter_pipe_src_node_added (GstInterPipeIListener * iface, if (g_strcmp0 (src->listen_to, node_name) == 0) { gst_inter_pipe_src_listen_node (src, node_name); + + /* Retry pending queries */ + gst_inter_pipe_src_retry_pending_queries (src); } return TRUE; } static gboolean -gst_inter_pipe_src_node_removed (GstInterPipeIListener * iface, - const gchar * node_name) +gst_inter_pipe_src_node_removed (GstInterPipeIListener *iface, + const gchar *node_name) { GstInterPipeSrc *src; @@ -564,8 +688,7 @@ gst_inter_pipe_src_node_removed (GstInterPipeIListener * iface, } static GstCaps * -gst_inter_pipe_src_get_caps (GstInterPipeIListener * iface, - gboolean * negotiated) +gst_inter_pipe_src_get_caps (GstInterPipeIListener *iface, gboolean *negotiated) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -590,8 +713,7 @@ gst_inter_pipe_src_get_caps (GstInterPipeIListener * iface, } static gboolean -gst_inter_pipe_src_set_caps (GstInterPipeIListener * iface, - const GstCaps * caps) +gst_inter_pipe_src_set_caps (GstInterPipeIListener *iface, const GstCaps *caps) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -621,8 +743,8 @@ gst_inter_pipe_src_set_caps (GstInterPipeIListener * iface, } static gboolean -gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, - GstBuffer * buffer, guint64 basetime) +gst_inter_pipe_src_push_buffer (GstInterPipeIListener *iface, + GstBuffer *buffer, guint64 basetime) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -658,7 +780,7 @@ gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, difftime = srcbasetime - basetime; if (GST_BUFFER_PTS (buffer) >= difftime) { GST_BUFFER_PTS (buffer) = GST_BUFFER_PTS (buffer) - difftime; - if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE ) { + if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE) { GST_BUFFER_DTS (buffer) = GST_BUFFER_DTS (buffer) - difftime; } } else { @@ -668,7 +790,7 @@ gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, } else { difftime = basetime - srcbasetime; GST_BUFFER_PTS (buffer) = GST_BUFFER_PTS (buffer) + difftime; - if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE ) { + if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE) { GST_BUFFER_DTS (buffer) = GST_BUFFER_DTS (buffer) + difftime; } } @@ -704,7 +826,7 @@ gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, } static gboolean -gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, +gst_inter_pipe_src_push_event (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime) { GstInterPipeSrc *src; @@ -756,8 +878,9 @@ gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, } } + static gboolean -gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) +gst_inter_pipe_src_send_eos (GstInterPipeIListener *iface) { GstInterPipeSrc *src; GstAppSrc *appsrc; @@ -775,8 +898,35 @@ gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) return TRUE; } + +static gboolean +gst_inter_pipe_src_push_query (GstInterPipeIListener *iface, GstQuery *query) +{ + GstInterPipeSrc *src; + GstPad *srcpad; + GstPad *peerpad; + gboolean ret = TRUE; + + src = GST_INTER_PIPE_SRC (iface); + srcpad = GST_INTER_PIPE_SRC_PAD (GST_APP_SRC (src)); + + peerpad = gst_pad_get_peer (srcpad); + if (!peerpad) { + ret = FALSE; + goto out; + } + + ret = gst_pad_query (peerpad, query); + + gst_object_unref (peerpad); + +out: + return ret; +} + + static gboolean -gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name) +gst_inter_pipe_src_listen_node (GstInterPipeSrc *src, const gchar *node_name) { GstInterPipeIListener *listener; diff --git a/meson_options.txt b/meson_options.txt index 2c097c1..1a235bc 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -1,6 +1,6 @@ # Feature options option('tests', type : 'feature', value : 'auto', yield : true, description : 'Enable tests') -option('enable-gtk-doc', type : 'boolean', value : true, description : 'Use gtk-doc to build documentation') +option('enable-gtk-doc', type : 'boolean', value : false, description : 'Use gtk-doc to build documentation') # Common options option('package-name', type : 'string', yield : true,