From 1e2c41d0781c5acfad5e0cfed2dc8ecbca14f1c1 Mon Sep 17 00:00:00 2001 From: Melissa Montero Date: Tue, 27 Oct 2020 12:05:52 -0600 Subject: [PATCH] Forward query allocation and aggregate the results This will forward the query allocation when received between the connected listeners and aggregates the results of each listeners. This helps to indicate the upstream elements which metas are supported. For example it avoids v4l2src buffer copy when the downstream elements doesn't report that video meta is supported and the pitch is different than width. --- gst/interpipe/gstinterpipeilistener.c | 15 ++ gst/interpipe/gstinterpipeilistener.h | 14 ++ gst/interpipe/gstinterpipesink.c | 231 +++++++++++++++++++++++++- gst/interpipe/gstinterpipesrc.c | 31 ++++ 4 files changed, 288 insertions(+), 3 deletions(-) diff --git a/gst/interpipe/gstinterpipeilistener.c b/gst/interpipe/gstinterpipeilistener.c index 063f5ab..f63d0c3 100644 --- a/gst/interpipe/gstinterpipeilistener.c +++ b/gst/interpipe/gstinterpipeilistener.c @@ -142,6 +142,21 @@ gst_inter_pipe_ilistener_push_event (GstInterPipeIListener * self, return iface->push_event (self, event, basetime); } +gboolean +gst_inter_pipe_ilistener_query (GstInterPipeIListener * self, GstQuery * query) +{ + GstInterPipeIListenerInterface *iface; + + g_return_val_if_fail (GST_INTER_PIPE_IS_ILISTENER (self), FALSE); + g_return_val_if_fail (query, FALSE); + + iface = GST_INTER_PIPE_ILISTENER_GET_IFACE (self); + g_return_val_if_fail (iface->query != NULL, FALSE); + + return iface->query (self, query); +} + + gboolean gst_inter_pipe_ilistener_send_eos (GstInterPipeIListener * self) { diff --git a/gst/interpipe/gstinterpipeilistener.h b/gst/interpipe/gstinterpipeilistener.h index 577cfb8..4c210a3 100644 --- a/gst/interpipe/gstinterpipeilistener.h +++ b/gst/interpipe/gstinterpipeilistener.h @@ -94,6 +94,7 @@ struct _GstInterPipeIListenerInterface gboolean (* node_removed) (GstInterPipeIListener *iface, const gchar *node_removed); gboolean (* push_buffer) (GstInterPipeIListener *iface, GstBuffer *buffer, guint64 basetime); gboolean (* push_event) (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + gboolean (* query) (GstInterPipeIListener *iface, GstQuery *query); gboolean (* send_eos) (GstInterPipeIListener *iface); }; @@ -193,6 +194,19 @@ gboolean gst_inter_pipe_ilistener_push_buffer (GstInterPipeIListener *iface, gboolean gst_inter_pipe_ilistener_push_event (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + +/** + * gst_inter_pipe_ilistener_query: + * @iface: (transfer none)(not nullable): The object that should query the #GstQuery downstream. + * @event: (transfer full)(not nullable): The #GstQuery to be pushed downstream. + * + * Ask @query to the downstream element. + * + * Return: True if the query was successful, False otherwise. + */ +gboolean gst_inter_pipe_ilistener_query (GstInterPipeIListener *iface, + GstQuery *query); + /** * gst_inter_pipe_ilistener_send_eos: * @iface: (transfer none)(not nullable): The object that should push the #GstEvent downstream. diff --git a/gst/interpipe/gstinterpipesink.c b/gst/interpipe/gstinterpipesink.c index 4e6eeb3..4e5fb97 100644 --- a/gst/interpipe/gstinterpipesink.c +++ b/gst/interpipe/gstinterpipesink.c @@ -82,6 +82,8 @@ static gboolean gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * filter); static gboolean gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event); +static gboolean gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, + GstQuery * query); static gboolean gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, GstCaps * listener_caps, GstCaps * sinkcaps); static GstCaps *gst_inter_pipe_sink_caps_intersect (GstCaps * caps1, @@ -174,7 +176,8 @@ gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass) basesink_class->get_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_get_caps); basesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_set_caps); basesink_class->event = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_event); - + basesink_class->propose_allocation = + GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_propose_allocation); } static void @@ -587,6 +590,228 @@ gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event) event); } + +struct AllocQueryCtx +{ + GstInterPipeSink *sink; + GstQuery *query; + GstAllocationParams params; + guint size; + guint min_buffers; + gboolean first_query; + guint num_listeners; +}; + + +static gboolean +gst_inter_pipe_sink_forward_query_allocation (gpointer key, gpointer data, + gpointer user_data) +{ + struct AllocQueryCtx *ctx; + GstInterPipeIListener *listener; + gchar *listener_name; + GstInterPipeSink *sink; + GstQuery *query; + GstCaps *caps; + gboolean ret = TRUE; + guint count, i, size, min; + + listener = GST_INTER_PIPE_ILISTENER (data); + listener_name = (gchar *) key; + ctx = user_data; + sink = ctx->sink; + + GST_DEBUG_OBJECT (sink, "Aggregating allocation from listener %s", + listener_name); + + gst_query_parse_allocation (ctx->query, &caps, NULL); + + query = gst_query_new_allocation (caps, FALSE); + if (!gst_inter_pipe_ilistener_query (listener, query)) { + GST_DEBUG_OBJECT (sink, + "Allocation query failed on listener %s, ignoring allocation", + listener_name); + ret = FALSE; + goto out; + } + + /* Allocation Filter, extract of code from tee element */ + + /* Allocation Params: + * store the maximum alignment, prefix and padding, but ignore the + * allocators and the flags which are tied to downstream allocation*/ + count = gst_query_get_n_allocation_params (query); + for (i = 0; i < count; i++) { + GstAllocationParams params = { 0, }; + + gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms); + + GST_DEBUG_OBJECT (sink, "Aggregating AllocationParams align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, params.align, params.prefix, params.padding); + + if (ctx->params.align < params.align) + ctx->params.align = params.align; + + if (ctx->params.prefix < params.prefix) + ctx->params.prefix = params.prefix; + + if (ctx->params.padding < params.padding) + ctx->params.padding = params.padding; + } + + /* Allocation Pool: + * We want to keep the biggest size and biggest minimum number of buffers to + * make sure downstream requirement can be satisfied. We don't really care + * about the maximum, as this is a parameter of the downstream provided + * pool. We only read the first allocation pool as the minimum number of + * buffers is normally constant regardless of the pool being used. */ + if (gst_query_get_n_allocation_pools (query) > 0) { + gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL); + + GST_DEBUG_OBJECT (sink, + "Aggregating allocation pool size=%u min_buffers=%u", size, min); + + if (ctx->size < size) + ctx->size = size; + + if (ctx->min_buffers < min) + ctx->min_buffers = min; + } + + /* Allocation Meta: + * For allocation meta, we'll need to aggregate the argument using the new + * GstMetaInfo::agggregate_func */ + count = gst_query_get_n_allocation_metas (query); + for (i = 0; i < count; i++) { + guint ctx_index; + GType api; + const GstStructure *param; + + api = gst_query_parse_nth_allocation_meta (query, i, ¶m); + + /* For the first query, copy all metas */ + if (ctx->first_query) { + gst_query_add_allocation_meta (ctx->query, api, param); + continue; + } + + /* Afterward, aggregate the common params */ + if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) { + const GstStructure *ctx_param; + + gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param); + + /* Keep meta which has no params */ + if (ctx_param == NULL && param == NULL) + continue; + + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, ctx_index); + } + } + + /* Finally, cleanup metas from the stored query that aren't support on this + * listener. */ + count = gst_query_get_n_allocation_metas (ctx->query); + for (i = 0; i < count;) { + GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL); + + if (!gst_query_find_allocation_meta (query, api, NULL)) { + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, i); + count--; + continue; + } + + i++; + } + + ctx->first_query = FALSE; + ctx->num_listeners++; + +out: + gst_query_unref (query); + return ret; +} + +static gboolean +gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, GstQuery * query) +{ + struct AllocQueryCtx ctx = { 0 }; + GstInterPipeSink *sink; + GHashTable *listeners; + GHashTableIter iter; + gboolean ret = TRUE; + gpointer key, value; + + sink = GST_INTER_PIPE_SINK (base); + + g_mutex_lock (&sink->listeners_mutex); + listeners = GST_INTER_PIPE_SINK_LISTENERS (sink); + + ctx.sink = sink; + ctx.query = query; + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + + g_hash_table_iter_init (&iter, listeners); + + while (g_hash_table_iter_next (&iter, &key, &value)) { + ret |= gst_inter_pipe_sink_forward_query_allocation (key, value, &ctx); + } + + if (ret) { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + GST_DEBUG_OBJECT (sink, + "Final allocation parameters: align=%" G_GSIZE_FORMAT " prefix=%" + G_GSIZE_FORMAT " padding %" G_GSIZE_FORMAT, ctx.params.align, + ctx.params.prefix, ctx.params.padding); + + GST_DEBUG_OBJECT (sink, "Final allocation pools: size=%u min_buffers=%u", + ctx.size, ctx.min_buffers); + + GST_DEBUG_OBJECT (sink, "Final %u allocation meta:", count); + + for (i = 0; i < count; i++) { + GST_DEBUG_OBJECT (sink, " + aggregated allocation meta %s", + g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i, + NULL))); + } + + /* Allocate one more buffers when multiplexing so we don't starve the + * downstream threads. */ + if (ctx.num_listeners > 1) + ctx.min_buffers++; + + /* Check that we actually have parameters besides the defaults. */ + if (ctx.params.align || ctx.params.prefix || ctx.params.padding) { + gst_query_add_allocation_param (ctx.query, NULL, &ctx.params); + } + + /* When size == 0, buffers created from this pool would have no memory + * allocated. */ + if (ctx.size) { + gst_query_add_allocation_pool (ctx.query, NULL, ctx.size, + ctx.min_buffers, 0); + } + + } else { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + for (i = 1; i <= count; i++) { + gst_query_remove_nth_allocation_meta (query, count - i); + } + } + + g_mutex_unlock (&sink->listeners_mutex); + + return ret; +} + /* Appsink Callbacks */ static void gst_inter_pipe_sink_push_to_listener (gpointer key, gpointer data, @@ -747,9 +972,9 @@ gst_inter_pipe_sink_add_listener (GstInterPipeINode * iface, has_listeners = 0 != g_hash_table_size (listeners); if (!sink->caps_negotiated && !has_listeners - && !gst_caps_is_equal (srccaps, sinkcaps)) { + && !gst_caps_is_equal (srccaps, sinkcaps)) { - if (!gst_pad_push_event (GST_INTER_PIPE_SINK_PAD (sink), + if (!gst_pad_push_event (GST_INTER_PIPE_SINK_PAD (sink), gst_event_new_reconfigure ())) goto reconfigure_event_error; diff --git a/gst/interpipe/gstinterpipesrc.c b/gst/interpipe/gstinterpipesrc.c index fd4c587..7b8f6ab 100644 --- a/gst/interpipe/gstinterpipesrc.c +++ b/gst/interpipe/gstinterpipesrc.c @@ -87,6 +87,8 @@ static gboolean gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, static gboolean gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, guint64 basetime); static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface); +static gboolean gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, + GstQuery * query); static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name); static gboolean gst_inter_pipe_src_start (GstBaseSrc * base); @@ -508,6 +510,7 @@ gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface) iface->set_caps = gst_inter_pipe_src_set_caps; iface->push_buffer = gst_inter_pipe_src_push_buffer; iface->push_event = gst_inter_pipe_src_push_event; + iface->query = gst_inter_pipe_src_push_query; iface->send_eos = gst_inter_pipe_src_send_eos; } @@ -739,6 +742,7 @@ gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, } } + static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) { @@ -758,6 +762,33 @@ gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) return TRUE; } + +static gboolean +gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, GstQuery * query) +{ + GstInterPipeSrc *src; + GstPad *srcpad; + GstPad *peerpad; + gboolean ret = TRUE; + + src = GST_INTER_PIPE_SRC (iface); + srcpad = GST_INTER_PIPE_SRC_PAD (GST_APP_SRC (src)); + + peerpad = gst_pad_get_peer (srcpad); + if (!peerpad) { + ret = FALSE; + goto out; + } + + ret = gst_pad_query (peerpad, query); + + gst_object_unref (peerpad); + +out: + return ret; +} + + static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name) {