diff --git a/gst/interpipe/gstinterpipeilistener.c b/gst/interpipe/gstinterpipeilistener.c index 063f5ab..f63d0c3 100644 --- a/gst/interpipe/gstinterpipeilistener.c +++ b/gst/interpipe/gstinterpipeilistener.c @@ -142,6 +142,21 @@ gst_inter_pipe_ilistener_push_event (GstInterPipeIListener * self, return iface->push_event (self, event, basetime); } +gboolean +gst_inter_pipe_ilistener_query (GstInterPipeIListener * self, GstQuery * query) +{ + GstInterPipeIListenerInterface *iface; + + g_return_val_if_fail (GST_INTER_PIPE_IS_ILISTENER (self), FALSE); + g_return_val_if_fail (query, FALSE); + + iface = GST_INTER_PIPE_ILISTENER_GET_IFACE (self); + g_return_val_if_fail (iface->query != NULL, FALSE); + + return iface->query (self, query); +} + + gboolean gst_inter_pipe_ilistener_send_eos (GstInterPipeIListener * self) { diff --git a/gst/interpipe/gstinterpipeilistener.h b/gst/interpipe/gstinterpipeilistener.h index 577cfb8..4c210a3 100644 --- a/gst/interpipe/gstinterpipeilistener.h +++ b/gst/interpipe/gstinterpipeilistener.h @@ -94,6 +94,7 @@ struct _GstInterPipeIListenerInterface gboolean (* node_removed) (GstInterPipeIListener *iface, const gchar *node_removed); gboolean (* push_buffer) (GstInterPipeIListener *iface, GstBuffer *buffer, guint64 basetime); gboolean (* push_event) (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + gboolean (* query) (GstInterPipeIListener *iface, GstQuery *query); gboolean (* send_eos) (GstInterPipeIListener *iface); }; @@ -193,6 +194,19 @@ gboolean gst_inter_pipe_ilistener_push_buffer (GstInterPipeIListener *iface, gboolean gst_inter_pipe_ilistener_push_event (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime); + +/** + * gst_inter_pipe_ilistener_query: + * @iface: (transfer none)(not nullable): The object that should query the #GstQuery downstream. + * @event: (transfer full)(not nullable): The #GstQuery to be pushed downstream. + * + * Ask @query to the downstream element. + * + * Return: True if the query was successful, False otherwise. + */ +gboolean gst_inter_pipe_ilistener_query (GstInterPipeIListener *iface, + GstQuery *query); + /** * gst_inter_pipe_ilistener_send_eos: * @iface: (transfer none)(not nullable): The object that should push the #GstEvent downstream. diff --git a/gst/interpipe/gstinterpipesink.c b/gst/interpipe/gstinterpipesink.c index 4e6eeb3..4e5fb97 100644 --- a/gst/interpipe/gstinterpipesink.c +++ b/gst/interpipe/gstinterpipesink.c @@ -82,6 +82,8 @@ static gboolean gst_inter_pipe_sink_set_caps (GstBaseSink * base, GstCaps * filter); static gboolean gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event); +static gboolean gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, + GstQuery * query); static gboolean gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink * sink, GstCaps * listener_caps, GstCaps * sinkcaps); static GstCaps *gst_inter_pipe_sink_caps_intersect (GstCaps * caps1, @@ -174,7 +176,8 @@ gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass) basesink_class->get_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_get_caps); basesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_set_caps); basesink_class->event = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_event); - + basesink_class->propose_allocation = + GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_propose_allocation); } static void @@ -587,6 +590,228 @@ gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event) event); } + +struct AllocQueryCtx +{ + GstInterPipeSink *sink; + GstQuery *query; + GstAllocationParams params; + guint size; + guint min_buffers; + gboolean first_query; + guint num_listeners; +}; + + +static gboolean +gst_inter_pipe_sink_forward_query_allocation (gpointer key, gpointer data, + gpointer user_data) +{ + struct AllocQueryCtx *ctx; + GstInterPipeIListener *listener; + gchar *listener_name; + GstInterPipeSink *sink; + GstQuery *query; + GstCaps *caps; + gboolean ret = TRUE; + guint count, i, size, min; + + listener = GST_INTER_PIPE_ILISTENER (data); + listener_name = (gchar *) key; + ctx = user_data; + sink = ctx->sink; + + GST_DEBUG_OBJECT (sink, "Aggregating allocation from listener %s", + listener_name); + + gst_query_parse_allocation (ctx->query, &caps, NULL); + + query = gst_query_new_allocation (caps, FALSE); + if (!gst_inter_pipe_ilistener_query (listener, query)) { + GST_DEBUG_OBJECT (sink, + "Allocation query failed on listener %s, ignoring allocation", + listener_name); + ret = FALSE; + goto out; + } + + /* Allocation Filter, extract of code from tee element */ + + /* Allocation Params: + * store the maximum alignment, prefix and padding, but ignore the + * allocators and the flags which are tied to downstream allocation*/ + count = gst_query_get_n_allocation_params (query); + for (i = 0; i < count; i++) { + GstAllocationParams params = { 0, }; + + gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms); + + GST_DEBUG_OBJECT (sink, "Aggregating AllocationParams align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, params.align, params.prefix, params.padding); + + if (ctx->params.align < params.align) + ctx->params.align = params.align; + + if (ctx->params.prefix < params.prefix) + ctx->params.prefix = params.prefix; + + if (ctx->params.padding < params.padding) + ctx->params.padding = params.padding; + } + + /* Allocation Pool: + * We want to keep the biggest size and biggest minimum number of buffers to + * make sure downstream requirement can be satisfied. We don't really care + * about the maximum, as this is a parameter of the downstream provided + * pool. We only read the first allocation pool as the minimum number of + * buffers is normally constant regardless of the pool being used. */ + if (gst_query_get_n_allocation_pools (query) > 0) { + gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL); + + GST_DEBUG_OBJECT (sink, + "Aggregating allocation pool size=%u min_buffers=%u", size, min); + + if (ctx->size < size) + ctx->size = size; + + if (ctx->min_buffers < min) + ctx->min_buffers = min; + } + + /* Allocation Meta: + * For allocation meta, we'll need to aggregate the argument using the new + * GstMetaInfo::agggregate_func */ + count = gst_query_get_n_allocation_metas (query); + for (i = 0; i < count; i++) { + guint ctx_index; + GType api; + const GstStructure *param; + + api = gst_query_parse_nth_allocation_meta (query, i, ¶m); + + /* For the first query, copy all metas */ + if (ctx->first_query) { + gst_query_add_allocation_meta (ctx->query, api, param); + continue; + } + + /* Afterward, aggregate the common params */ + if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) { + const GstStructure *ctx_param; + + gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param); + + /* Keep meta which has no params */ + if (ctx_param == NULL && param == NULL) + continue; + + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, ctx_index); + } + } + + /* Finally, cleanup metas from the stored query that aren't support on this + * listener. */ + count = gst_query_get_n_allocation_metas (ctx->query); + for (i = 0; i < count;) { + GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL); + + if (!gst_query_find_allocation_meta (query, api, NULL)) { + GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, i); + count--; + continue; + } + + i++; + } + + ctx->first_query = FALSE; + ctx->num_listeners++; + +out: + gst_query_unref (query); + return ret; +} + +static gboolean +gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, GstQuery * query) +{ + struct AllocQueryCtx ctx = { 0 }; + GstInterPipeSink *sink; + GHashTable *listeners; + GHashTableIter iter; + gboolean ret = TRUE; + gpointer key, value; + + sink = GST_INTER_PIPE_SINK (base); + + g_mutex_lock (&sink->listeners_mutex); + listeners = GST_INTER_PIPE_SINK_LISTENERS (sink); + + ctx.sink = sink; + ctx.query = query; + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + + g_hash_table_iter_init (&iter, listeners); + + while (g_hash_table_iter_next (&iter, &key, &value)) { + ret |= gst_inter_pipe_sink_forward_query_allocation (key, value, &ctx); + } + + if (ret) { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + GST_DEBUG_OBJECT (sink, + "Final allocation parameters: align=%" G_GSIZE_FORMAT " prefix=%" + G_GSIZE_FORMAT " padding %" G_GSIZE_FORMAT, ctx.params.align, + ctx.params.prefix, ctx.params.padding); + + GST_DEBUG_OBJECT (sink, "Final allocation pools: size=%u min_buffers=%u", + ctx.size, ctx.min_buffers); + + GST_DEBUG_OBJECT (sink, "Final %u allocation meta:", count); + + for (i = 0; i < count; i++) { + GST_DEBUG_OBJECT (sink, " + aggregated allocation meta %s", + g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i, + NULL))); + } + + /* Allocate one more buffers when multiplexing so we don't starve the + * downstream threads. */ + if (ctx.num_listeners > 1) + ctx.min_buffers++; + + /* Check that we actually have parameters besides the defaults. */ + if (ctx.params.align || ctx.params.prefix || ctx.params.padding) { + gst_query_add_allocation_param (ctx.query, NULL, &ctx.params); + } + + /* When size == 0, buffers created from this pool would have no memory + * allocated. */ + if (ctx.size) { + gst_query_add_allocation_pool (ctx.query, NULL, ctx.size, + ctx.min_buffers, 0); + } + + } else { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + for (i = 1; i <= count; i++) { + gst_query_remove_nth_allocation_meta (query, count - i); + } + } + + g_mutex_unlock (&sink->listeners_mutex); + + return ret; +} + /* Appsink Callbacks */ static void gst_inter_pipe_sink_push_to_listener (gpointer key, gpointer data, @@ -747,9 +972,9 @@ gst_inter_pipe_sink_add_listener (GstInterPipeINode * iface, has_listeners = 0 != g_hash_table_size (listeners); if (!sink->caps_negotiated && !has_listeners - && !gst_caps_is_equal (srccaps, sinkcaps)) { + && !gst_caps_is_equal (srccaps, sinkcaps)) { - if (!gst_pad_push_event (GST_INTER_PIPE_SINK_PAD (sink), + if (!gst_pad_push_event (GST_INTER_PIPE_SINK_PAD (sink), gst_event_new_reconfigure ())) goto reconfigure_event_error; diff --git a/gst/interpipe/gstinterpipesrc.c b/gst/interpipe/gstinterpipesrc.c index fd4c587..7b8f6ab 100644 --- a/gst/interpipe/gstinterpipesrc.c +++ b/gst/interpipe/gstinterpipesrc.c @@ -87,6 +87,8 @@ static gboolean gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface, static gboolean gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, guint64 basetime); static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface); +static gboolean gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, + GstQuery * query); static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name); static gboolean gst_inter_pipe_src_start (GstBaseSrc * base); @@ -508,6 +510,7 @@ gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface) iface->set_caps = gst_inter_pipe_src_set_caps; iface->push_buffer = gst_inter_pipe_src_push_buffer; iface->push_event = gst_inter_pipe_src_push_event; + iface->query = gst_inter_pipe_src_push_query; iface->send_eos = gst_inter_pipe_src_send_eos; } @@ -739,6 +742,7 @@ gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event, } } + static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) { @@ -758,6 +762,33 @@ gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface) return TRUE; } + +static gboolean +gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, GstQuery * query) +{ + GstInterPipeSrc *src; + GstPad *srcpad; + GstPad *peerpad; + gboolean ret = TRUE; + + src = GST_INTER_PIPE_SRC (iface); + srcpad = GST_INTER_PIPE_SRC_PAD (GST_APP_SRC (src)); + + peerpad = gst_pad_get_peer (srcpad); + if (!peerpad) { + ret = FALSE; + goto out; + } + + ret = gst_pad_query (peerpad, query); + + gst_object_unref (peerpad); + +out: + return ret; +} + + static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name) {