interpipesrc: clean appsrc caps at stop#69
Open
mmontero wants to merge 1 commit into
Open
Conversation
This cleans the appsrc caps at stop state, cleaning the negotiated state so when the pipeline is restarted it can initiate a clean negotiation process.
jcaballeros
approved these changes
Oct 27, 2020
michaelgruner
approved these changes
Oct 29, 2020
cfsbhawkins
added a commit
to cfsbhawkins/gst-interpipe
that referenced
this pull request
Jun 14, 2026
…nd perf hardening * interpipesink: set listener caps before its first buffer A listener that attached before this node had caps (a consumer pipeline started before a slow or network producer began producing) was pushed a buffer with no caps configured on its appsrc, which can fail downstream negotiation. Carry the sample caps into push_to_listener and, when a listener has no caps yet, set them before forwarding the buffer. Listeners that already have caps are left untouched, so allow-renegotiation behaviour is preserved. The lookup is skipped once a listener is negotiated; for the allow-renegotiation=false consumers used in the live split it is a cheap appsrc caps check on the data path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: renegotiate when caps arrive after a cold attach When a listener attaches before its node has caps, the base source loop can begin and negotiate an empty state before any caps exist. Caps then arrive via set_caps after the fact, and the base source does not re-run negotiation on its own, so it pushes against a stale state and can fail downstream with not-negotiated. Mark the source pad for reconfigure after setting the appsrc caps so the base source renegotiates with the new caps before the next buffer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: clear appsrc caps on stop for clean reconnect On stop the negotiated appsrc caps were left in place, so a restart, reconnect, or switch to a node with different caps could be refused by an allow-renegotiation=false listener still holding the previous caps. Clear the caps on stop so the next attach renegotiates from scratch. Based on RidgeRun gst-interpipe PR RidgeRun#69. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * tests: cold-attach coverage and gstd reproduction harness Add test_cold_attach, a gstcheck smoke test for the compensate-ts + is-live cold-ordering path the suite did not cover (consumer played before the producer node exists). On current GStreamer this path passes with a flexible converter downstream, so it guards against regressions there. The end-to-end race only surfaces with a strict downstream encoder that does not renegotiate on late caps, which needs the multi-pipeline gstd runtime rather than a single-process unit test. Add tests/gstd/ (a gstd build image plus a harness driving the three-leg split over gstd's HTTP API) so the deployment encoder can be substituted via the ENC override to reproduce and confirm the fix end to end. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: fix caps leak, NULL derefs, and set_caps correctness Review follow-ups in the caps path: - are_caps_compatible leaked the intersected caps on every call; unref it before returning. - intersect_listener_caps unconditionally unreffed the listener caps, but the listener get_caps may legitimately return NULL; guard the unref. - process_sample dereferenced the sample buffer without checking it; a sample can carry no buffer. Skip forwarding in that case. - set_caps ignored the parent set_caps result; fail if the parent rejects the caps instead of forwarding a state the parent did not accept. - set_caps read the listeners table size before taking the lock; move the check under the lock so it cannot race a concurrent listener add. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: add caps diagnostics around the negotiation path Add INFO/DEBUG logging at the caps decision points so the cold-attach and renegotiation flow can be traced in the field: the listener get_caps result and negotiated state, the listener caps being set plus the reconfigure mark on a late update, and the producer-side application of node caps to a listener that attached without caps. These are quiet at the default debug level and surface with interpipesrc:5 / interpipesink:5. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: guard the pending serial events queue with a mutex The pending serial events GQueue was pushed from the node's streaming thread (push_event) and drained from this element's streaming thread (create) with no synchronization; GQueue is not thread-safe, so concurrent access could corrupt it. Add a dedicated mutex and hold it around every queue operation. In create, decide and dequeue under the lock but push the event downstream after releasing it, so the lock is never held across gst_pad_push_event. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: hold the listeners lock across caps negotiation in get_caps get_caps released the listeners lock right after intersecting listener caps, then kept reading and writing sink->caps_negotiated and, on the no-intersection path, walked the listeners table unlocked. Both are mutated concurrently by add/remove_listener and set_caps, so this raced and could double-unref caps_negotiated or walk a table mid-rehash. Hold the lock across the whole negotiation. For the detach path, snapshot the listeners holding a ref on each, release the lock, then call leave_node outside it, since leave_node re-enters this element through remove_listener and would otherwise deadlock on the same mutex. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: fix allocation aggregation, stale listening flag, and priv leak Three review follow-ups: - propose_allocation accumulated the per-listener result with |= starting from TRUE, so it was always TRUE and the failure-handling branch that drops the aggregated metas was dead. Use &= so a listener that cannot satisfy the allocation query causes the metas to be dropped instead of claiming unsupported allocation. (Behavioural change in the allocation path; review if specific allocation negotiation is relied upon.) - interpipesrc set_property(listen-to) set listening=TRUE and logged success even when the listen attempt failed; only set it on success. - The registry leaked a freshly allocated listener priv when add_listener failed; free it on that path while leaving an existing, table-owned priv in place. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: harden compensate-ts against invalid and underflowing stamps The compensate-ts path adjusted PTS/DTS without checking validity or guarding the subtraction. A buffer with no PTS (GST_CLOCK_TIME_NONE) was treated as a huge value and turned into a bogus finite timestamp, and DTS was decremented without a lower bound, so a reordered stream whose DTS is below the base-time offset would underflow into a near-infinite DTS. Only adjust valid timestamps, keep the existing drop-when-unsynced behaviour for a valid PTS below the offset, and clamp DTS to zero instead of underflowing (consistent with the event-timestamp path). The normal valid-timestamp case is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: lock shared reads in add_listener and log rejected buffers add_listener read the listeners table size and caps_negotiated without the lock while deciding whether to send a reconfigure or renegotiate. Snapshot both under the lock so the decision uses a consistent view; the lock is not held across the pad and caps calls, which could re-enter the element. Also log when a listener does not accept a forwarded buffer instead of silently discarding the push_buffer result. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: return a node reference from get_node to fix a use-after-free gst_inter_pipe_get_node returned a borrowed pointer looked up under the nodes lock, so a caller (notably interpipesrc forwarding an upstream event, and the registry listen/leave paths) could use a node that another thread removed and finalized in the meantime. Take a reference under the lock and return it transfer-full; update the three callers to unref on every path. Documented in the header. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: key the listeners table by the listener object The listeners table was keyed by the listener's name string pointer with identity hashing, which is fragile: the name pointer can change if the element is renamed, leaving a stale key that can never be matched or removed. Key by the listener object pointer instead, which is stable for the object's lifetime, and derive the name from the listener in the per-listener callbacks. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: let the registry nodes table own its key strings The nodes table keyed on the node-name pointer borrowed from the element, which is only safe as long as every caller removes the node before freeing the name. Give the table a duplicated, table-owned key (g_free on removal) so the registry no longer depends on that discipline. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: harden listen-to threading, cut per-buffer caps query, fix recompute Production-readiness pass over the cold-attach branch covering a thread-safety hole, a hot-path regression, a caps-negotiation correctness bug, a leak, and log flooding. - listen-to use-after-free: the registry borrowed the listener's listen-to string pointer, so a property change freeing it on the app thread could leave the registry (and interpipesrc's streaming thread) reading freed memory. The registry now owns a g_strdup'd copy, freed on replace/remove. interpipesrc guards every listen_to read/write with GST_OBJECT_LOCK (set/get_property, start, stop, the upstream-event path, the node-added/removed callbacks, and the listen rollback). The lock is held only around the read/swap and never across a listen/leave call, keeping the order listeners_mutex -> object lock one-directional (no inversion). - per-buffer caps query: push_to_listener called ilistener_get_caps for every buffer and every listener; with allow-renegotiation=true that ran a full downstream peer caps query while holding the fanout lock. Add a cheap is_negotiated predicate to the listener interface, backed by interpipesrc's new caps_primed flag. The flag is cleared on every (re)attach (in gst_inter_pipe_src_listen_node, the single choke point start/set_property/ node_added funnel through) and on stop, and set when caps are applied. A node switch therefore re-primes caps from the new node's first buffer instead of being skipped because stale appsrc caps were still present. - get_caps recompute: reset caps_negotiated before intersecting the listeners. Folding each listener into a stale value could only ever narrow the result, so a listener that renegotiated to broader caps could never widen it back. - add_listener caps leak: the src_negotiated && !sinkcaps path jumped to add_to_list, skipping the srccaps unref. Release it before the jump. - logging: the per-buffer "can not be synchronized yet" message fired at the frame rate during cold start/reconnect; lower it from WARNING to DEBUG. Validated in Docker (GStreamer 1.28.3): clean build, gstcheck 15/15, gstd cold-attach repro ordered+rapid PASS (0 not-negotiated, no deadlock from the new locking), valgrind 0 definitely/indirectly lost with no interpipe frames across cold_attach/hot_plug/block_switch/caps_renegotiation/set_caps/stream_sync. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * docs: document the Docker build/test workflow in the README Add a "Building and testing with Docker" section covering the three images: Dockerfile (build + gstcheck suite), Dockerfile.valgrind (leak checks with the combined GLib/GStreamer suppressions), and Dockerfile.gstd (cross-process cold-attach reproduction over gstd's HTTP API). Includes the bind-mount invocations and the tests/check/gst_test_<name> binary-path detail so working- tree changes can be exercised without rebuilding the image. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
datagutt
pushed a commit
to irlenc/gst-interpipe
that referenced
this pull request
Jun 20, 2026
…nd perf hardening * interpipesink: set listener caps before its first buffer A listener that attached before this node had caps (a consumer pipeline started before a slow or network producer began producing) was pushed a buffer with no caps configured on its appsrc, which can fail downstream negotiation. Carry the sample caps into push_to_listener and, when a listener has no caps yet, set them before forwarding the buffer. Listeners that already have caps are left untouched, so allow-renegotiation behaviour is preserved. The lookup is skipped once a listener is negotiated; for the allow-renegotiation=false consumers used in the live split it is a cheap appsrc caps check on the data path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: renegotiate when caps arrive after a cold attach When a listener attaches before its node has caps, the base source loop can begin and negotiate an empty state before any caps exist. Caps then arrive via set_caps after the fact, and the base source does not re-run negotiation on its own, so it pushes against a stale state and can fail downstream with not-negotiated. Mark the source pad for reconfigure after setting the appsrc caps so the base source renegotiates with the new caps before the next buffer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: clear appsrc caps on stop for clean reconnect On stop the negotiated appsrc caps were left in place, so a restart, reconnect, or switch to a node with different caps could be refused by an allow-renegotiation=false listener still holding the previous caps. Clear the caps on stop so the next attach renegotiates from scratch. Based on RidgeRun gst-interpipe PR RidgeRun#69. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * tests: cold-attach coverage and gstd reproduction harness Add test_cold_attach, a gstcheck smoke test for the compensate-ts + is-live cold-ordering path the suite did not cover (consumer played before the producer node exists). On current GStreamer this path passes with a flexible converter downstream, so it guards against regressions there. The end-to-end race only surfaces with a strict downstream encoder that does not renegotiate on late caps, which needs the multi-pipeline gstd runtime rather than a single-process unit test. Add tests/gstd/ (a gstd build image plus a harness driving the three-leg split over gstd's HTTP API) so the deployment encoder can be substituted via the ENC override to reproduce and confirm the fix end to end. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: fix caps leak, NULL derefs, and set_caps correctness Review follow-ups in the caps path: - are_caps_compatible leaked the intersected caps on every call; unref it before returning. - intersect_listener_caps unconditionally unreffed the listener caps, but the listener get_caps may legitimately return NULL; guard the unref. - process_sample dereferenced the sample buffer without checking it; a sample can carry no buffer. Skip forwarding in that case. - set_caps ignored the parent set_caps result; fail if the parent rejects the caps instead of forwarding a state the parent did not accept. - set_caps read the listeners table size before taking the lock; move the check under the lock so it cannot race a concurrent listener add. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: add caps diagnostics around the negotiation path Add INFO/DEBUG logging at the caps decision points so the cold-attach and renegotiation flow can be traced in the field: the listener get_caps result and negotiated state, the listener caps being set plus the reconfigure mark on a late update, and the producer-side application of node caps to a listener that attached without caps. These are quiet at the default debug level and surface with interpipesrc:5 / interpipesink:5. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: guard the pending serial events queue with a mutex The pending serial events GQueue was pushed from the node's streaming thread (push_event) and drained from this element's streaming thread (create) with no synchronization; GQueue is not thread-safe, so concurrent access could corrupt it. Add a dedicated mutex and hold it around every queue operation. In create, decide and dequeue under the lock but push the event downstream after releasing it, so the lock is never held across gst_pad_push_event. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: hold the listeners lock across caps negotiation in get_caps get_caps released the listeners lock right after intersecting listener caps, then kept reading and writing sink->caps_negotiated and, on the no-intersection path, walked the listeners table unlocked. Both are mutated concurrently by add/remove_listener and set_caps, so this raced and could double-unref caps_negotiated or walk a table mid-rehash. Hold the lock across the whole negotiation. For the detach path, snapshot the listeners holding a ref on each, release the lock, then call leave_node outside it, since leave_node re-enters this element through remove_listener and would otherwise deadlock on the same mutex. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: fix allocation aggregation, stale listening flag, and priv leak Three review follow-ups: - propose_allocation accumulated the per-listener result with |= starting from TRUE, so it was always TRUE and the failure-handling branch that drops the aggregated metas was dead. Use &= so a listener that cannot satisfy the allocation query causes the metas to be dropped instead of claiming unsupported allocation. (Behavioural change in the allocation path; review if specific allocation negotiation is relied upon.) - interpipesrc set_property(listen-to) set listening=TRUE and logged success even when the listen attempt failed; only set it on success. - The registry leaked a freshly allocated listener priv when add_listener failed; free it on that path while leaving an existing, table-owned priv in place. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesrc: harden compensate-ts against invalid and underflowing stamps The compensate-ts path adjusted PTS/DTS without checking validity or guarding the subtraction. A buffer with no PTS (GST_CLOCK_TIME_NONE) was treated as a huge value and turned into a bogus finite timestamp, and DTS was decremented without a lower bound, so a reordered stream whose DTS is below the base-time offset would underflow into a near-infinite DTS. Only adjust valid timestamps, keep the existing drop-when-unsynced behaviour for a valid PTS below the offset, and clamp DTS to zero instead of underflowing (consistent with the event-timestamp path). The normal valid-timestamp case is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: lock shared reads in add_listener and log rejected buffers add_listener read the listeners table size and caps_negotiated without the lock while deciding whether to send a reconfigure or renegotiate. Snapshot both under the lock so the decision uses a consistent view; the lock is not held across the pad and caps calls, which could re-enter the element. Also log when a listener does not accept a forwarded buffer instead of silently discarding the push_buffer result. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: return a node reference from get_node to fix a use-after-free gst_inter_pipe_get_node returned a borrowed pointer looked up under the nodes lock, so a caller (notably interpipesrc forwarding an upstream event, and the registry listen/leave paths) could use a node that another thread removed and finalized in the meantime. Take a reference under the lock and return it transfer-full; update the three callers to unref on every path. Documented in the header. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipesink: key the listeners table by the listener object The listeners table was keyed by the listener's name string pointer with identity hashing, which is fragile: the name pointer can change if the element is renamed, leaving a stale key that can never be matched or removed. Key by the listener object pointer instead, which is stable for the object's lifetime, and derive the name from the listener in the per-listener callbacks. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: let the registry nodes table own its key strings The nodes table keyed on the node-name pointer borrowed from the element, which is only safe as long as every caller removes the node before freeing the name. Give the table a duplicated, table-owned key (g_free on removal) so the registry no longer depends on that discipline. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * interpipe: harden listen-to threading, cut per-buffer caps query, fix recompute Production-readiness pass over the cold-attach branch covering a thread-safety hole, a hot-path regression, a caps-negotiation correctness bug, a leak, and log flooding. - listen-to use-after-free: the registry borrowed the listener's listen-to string pointer, so a property change freeing it on the app thread could leave the registry (and interpipesrc's streaming thread) reading freed memory. The registry now owns a g_strdup'd copy, freed on replace/remove. interpipesrc guards every listen_to read/write with GST_OBJECT_LOCK (set/get_property, start, stop, the upstream-event path, the node-added/removed callbacks, and the listen rollback). The lock is held only around the read/swap and never across a listen/leave call, keeping the order listeners_mutex -> object lock one-directional (no inversion). - per-buffer caps query: push_to_listener called ilistener_get_caps for every buffer and every listener; with allow-renegotiation=true that ran a full downstream peer caps query while holding the fanout lock. Add a cheap is_negotiated predicate to the listener interface, backed by interpipesrc's new caps_primed flag. The flag is cleared on every (re)attach (in gst_inter_pipe_src_listen_node, the single choke point start/set_property/ node_added funnel through) and on stop, and set when caps are applied. A node switch therefore re-primes caps from the new node's first buffer instead of being skipped because stale appsrc caps were still present. - get_caps recompute: reset caps_negotiated before intersecting the listeners. Folding each listener into a stale value could only ever narrow the result, so a listener that renegotiated to broader caps could never widen it back. - add_listener caps leak: the src_negotiated && !sinkcaps path jumped to add_to_list, skipping the srccaps unref. Release it before the jump. - logging: the per-buffer "can not be synchronized yet" message fired at the frame rate during cold start/reconnect; lower it from WARNING to DEBUG. Validated in Docker (GStreamer 1.28.3): clean build, gstcheck 15/15, gstd cold-attach repro ordered+rapid PASS (0 not-negotiated, no deadlock from the new locking), valgrind 0 definitely/indirectly lost with no interpipe frames across cold_attach/hot_plug/block_switch/caps_renegotiation/set_caps/stream_sync. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * docs: document the Docker build/test workflow in the README Add a "Building and testing with Docker" section covering the three images: Dockerfile (build + gstcheck suite), Dockerfile.valgrind (leak checks with the combined GLib/GStreamer suppressions), and Dockerfile.gstd (cross-process cold-attach reproduction over gstd's HTTP API). Includes the bind-mount invocations and the tests/check/gst_test_<name> binary-path detail so working- tree changes can be exercised without rebuilding the image. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This cleans the appsrc caps at stop state, cleaning
the negotiated state so when the pipeline is restarted
it can initiate a clean negotiation process.