Skip to content

interpipesrc: clean appsrc caps at stop#69

Open
mmontero wants to merge 1 commit into
developfrom
feature/clean-interpipesrc-caps-at-stop
Open

interpipesrc: clean appsrc caps at stop#69
mmontero wants to merge 1 commit into
developfrom
feature/clean-interpipesrc-caps-at-stop

Conversation

@mmontero

Copy link
Copy Markdown
Contributor

This cleans the appsrc caps at stop state, cleaning
the negotiated state so when the pipeline is restarted
it can initiate a clean negotiation process.

This cleans the appsrc caps at stop state, cleaning
the negotiated state so when the pipeline is restarted
it can initiate a clean negotiation process.
cfsbhawkins added a commit to cfsbhawkins/gst-interpipe that referenced this pull request Jun 14, 2026
…nd perf hardening

* interpipesink: set listener caps before its first buffer

A listener that attached before this node had caps (a consumer pipeline
started before a slow or network producer began producing) was pushed a
buffer with no caps configured on its appsrc, which can fail downstream
negotiation. Carry the sample caps into push_to_listener and, when a
listener has no caps yet, set them before forwarding the buffer.

Listeners that already have caps are left untouched, so allow-renegotiation
behaviour is preserved. The lookup is skipped once a listener is negotiated;
for the allow-renegotiation=false consumers used in the live split it is a
cheap appsrc caps check on the data path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: renegotiate when caps arrive after a cold attach

When a listener attaches before its node has caps, the base source loop
can begin and negotiate an empty state before any caps exist. Caps then
arrive via set_caps after the fact, and the base source does not re-run
negotiation on its own, so it pushes against a stale state and can fail
downstream with not-negotiated.

Mark the source pad for reconfigure after setting the appsrc caps so the
base source renegotiates with the new caps before the next buffer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: clear appsrc caps on stop for clean reconnect

On stop the negotiated appsrc caps were left in place, so a restart,
reconnect, or switch to a node with different caps could be refused by an
allow-renegotiation=false listener still holding the previous caps. Clear
the caps on stop so the next attach renegotiates from scratch.

Based on RidgeRun gst-interpipe PR RidgeRun#69.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* tests: cold-attach coverage and gstd reproduction harness

Add test_cold_attach, a gstcheck smoke test for the compensate-ts +
is-live cold-ordering path the suite did not cover (consumer played
before the producer node exists). On current GStreamer this path passes
with a flexible converter downstream, so it guards against regressions
there.

The end-to-end race only surfaces with a strict downstream encoder that
does not renegotiate on late caps, which needs the multi-pipeline gstd
runtime rather than a single-process unit test. Add tests/gstd/ (a gstd
build image plus a harness driving the three-leg split over gstd's HTTP
API) so the deployment encoder can be substituted via the ENC override
to reproduce and confirm the fix end to end.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: fix caps leak, NULL derefs, and set_caps correctness

Review follow-ups in the caps path:

- are_caps_compatible leaked the intersected caps on every call; unref it
  before returning.
- intersect_listener_caps unconditionally unreffed the listener caps, but
  the listener get_caps may legitimately return NULL; guard the unref.
- process_sample dereferenced the sample buffer without checking it; a
  sample can carry no buffer. Skip forwarding in that case.
- set_caps ignored the parent set_caps result; fail if the parent rejects
  the caps instead of forwarding a state the parent did not accept.
- set_caps read the listeners table size before taking the lock; move the
  check under the lock so it cannot race a concurrent listener add.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: add caps diagnostics around the negotiation path

Add INFO/DEBUG logging at the caps decision points so the cold-attach and
renegotiation flow can be traced in the field: the listener get_caps
result and negotiated state, the listener caps being set plus the
reconfigure mark on a late update, and the producer-side application of
node caps to a listener that attached without caps.

These are quiet at the default debug level and surface with
interpipesrc:5 / interpipesink:5.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: guard the pending serial events queue with a mutex

The pending serial events GQueue was pushed from the node's streaming
thread (push_event) and drained from this element's streaming thread
(create) with no synchronization; GQueue is not thread-safe, so concurrent
access could corrupt it. Add a dedicated mutex and hold it around every
queue operation. In create, decide and dequeue under the lock but push the
event downstream after releasing it, so the lock is never held across
gst_pad_push_event.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: hold the listeners lock across caps negotiation in get_caps

get_caps released the listeners lock right after intersecting listener
caps, then kept reading and writing sink->caps_negotiated and, on the
no-intersection path, walked the listeners table unlocked. Both are
mutated concurrently by add/remove_listener and set_caps, so this raced
and could double-unref caps_negotiated or walk a table mid-rehash.

Hold the lock across the whole negotiation. For the detach path, snapshot
the listeners holding a ref on each, release the lock, then call
leave_node outside it, since leave_node re-enters this element through
remove_listener and would otherwise deadlock on the same mutex.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: fix allocation aggregation, stale listening flag, and priv leak

Three review follow-ups:

- propose_allocation accumulated the per-listener result with |= starting
  from TRUE, so it was always TRUE and the failure-handling branch that
  drops the aggregated metas was dead. Use &= so a listener that cannot
  satisfy the allocation query causes the metas to be dropped instead of
  claiming unsupported allocation. (Behavioural change in the allocation
  path; review if specific allocation negotiation is relied upon.)

- interpipesrc set_property(listen-to) set listening=TRUE and logged
  success even when the listen attempt failed; only set it on success.

- The registry leaked a freshly allocated listener priv when add_listener
  failed; free it on that path while leaving an existing, table-owned priv
  in place.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: harden compensate-ts against invalid and underflowing stamps

The compensate-ts path adjusted PTS/DTS without checking validity or
guarding the subtraction. A buffer with no PTS (GST_CLOCK_TIME_NONE) was
treated as a huge value and turned into a bogus finite timestamp, and DTS
was decremented without a lower bound, so a reordered stream whose DTS is
below the base-time offset would underflow into a near-infinite DTS.

Only adjust valid timestamps, keep the existing drop-when-unsynced
behaviour for a valid PTS below the offset, and clamp DTS to zero instead
of underflowing (consistent with the event-timestamp path). The normal
valid-timestamp case is unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: lock shared reads in add_listener and log rejected buffers

add_listener read the listeners table size and caps_negotiated without the
lock while deciding whether to send a reconfigure or renegotiate. Snapshot
both under the lock so the decision uses a consistent view; the lock is not
held across the pad and caps calls, which could re-enter the element.

Also log when a listener does not accept a forwarded buffer instead of
silently discarding the push_buffer result.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: return a node reference from get_node to fix a use-after-free

gst_inter_pipe_get_node returned a borrowed pointer looked up under the
nodes lock, so a caller (notably interpipesrc forwarding an upstream event,
and the registry listen/leave paths) could use a node that another thread
removed and finalized in the meantime.

Take a reference under the lock and return it transfer-full; update the
three callers to unref on every path. Documented in the header.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: key the listeners table by the listener object

The listeners table was keyed by the listener's name string pointer with
identity hashing, which is fragile: the name pointer can change if the
element is renamed, leaving a stale key that can never be matched or
removed. Key by the listener object pointer instead, which is stable for
the object's lifetime, and derive the name from the listener in the
per-listener callbacks.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: let the registry nodes table own its key strings

The nodes table keyed on the node-name pointer borrowed from the element,
which is only safe as long as every caller removes the node before freeing
the name. Give the table a duplicated, table-owned key (g_free on removal)
so the registry no longer depends on that discipline.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: harden listen-to threading, cut per-buffer caps query, fix recompute

Production-readiness pass over the cold-attach branch covering a thread-safety
hole, a hot-path regression, a caps-negotiation correctness bug, a leak, and
log flooding.

- listen-to use-after-free: the registry borrowed the listener's listen-to
  string pointer, so a property change freeing it on the app thread could leave
  the registry (and interpipesrc's streaming thread) reading freed memory. The
  registry now owns a g_strdup'd copy, freed on replace/remove. interpipesrc
  guards every listen_to read/write with GST_OBJECT_LOCK (set/get_property,
  start, stop, the upstream-event path, the node-added/removed callbacks, and
  the listen rollback). The lock is held only around the read/swap and never
  across a listen/leave call, keeping the order listeners_mutex -> object lock
  one-directional (no inversion).

- per-buffer caps query: push_to_listener called ilistener_get_caps for every
  buffer and every listener; with allow-renegotiation=true that ran a full
  downstream peer caps query while holding the fanout lock. Add a cheap
  is_negotiated predicate to the listener interface, backed by interpipesrc's
  new caps_primed flag. The flag is cleared on every (re)attach (in
  gst_inter_pipe_src_listen_node, the single choke point start/set_property/
  node_added funnel through) and on stop, and set when caps are applied. A node
  switch therefore re-primes caps from the new node's first buffer instead of
  being skipped because stale appsrc caps were still present.

- get_caps recompute: reset caps_negotiated before intersecting the listeners.
  Folding each listener into a stale value could only ever narrow the result, so
  a listener that renegotiated to broader caps could never widen it back.

- add_listener caps leak: the src_negotiated && !sinkcaps path jumped to
  add_to_list, skipping the srccaps unref. Release it before the jump.

- logging: the per-buffer "can not be synchronized yet" message fired at the
  frame rate during cold start/reconnect; lower it from WARNING to DEBUG.

Validated in Docker (GStreamer 1.28.3): clean build, gstcheck 15/15, gstd
cold-attach repro ordered+rapid PASS (0 not-negotiated, no deadlock from the new
locking), valgrind 0 definitely/indirectly lost with no interpipe frames across
cold_attach/hot_plug/block_switch/caps_renegotiation/set_caps/stream_sync.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* docs: document the Docker build/test workflow in the README

Add a "Building and testing with Docker" section covering the three images:
Dockerfile (build + gstcheck suite), Dockerfile.valgrind (leak checks with the
combined GLib/GStreamer suppressions), and Dockerfile.gstd (cross-process
cold-attach reproduction over gstd's HTTP API). Includes the bind-mount
invocations and the tests/check/gst_test_<name> binary-path detail so working-
tree changes can be exercised without rebuilding the image.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
datagutt pushed a commit to irlenc/gst-interpipe that referenced this pull request Jun 20, 2026
…nd perf hardening

* interpipesink: set listener caps before its first buffer

A listener that attached before this node had caps (a consumer pipeline
started before a slow or network producer began producing) was pushed a
buffer with no caps configured on its appsrc, which can fail downstream
negotiation. Carry the sample caps into push_to_listener and, when a
listener has no caps yet, set them before forwarding the buffer.

Listeners that already have caps are left untouched, so allow-renegotiation
behaviour is preserved. The lookup is skipped once a listener is negotiated;
for the allow-renegotiation=false consumers used in the live split it is a
cheap appsrc caps check on the data path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: renegotiate when caps arrive after a cold attach

When a listener attaches before its node has caps, the base source loop
can begin and negotiate an empty state before any caps exist. Caps then
arrive via set_caps after the fact, and the base source does not re-run
negotiation on its own, so it pushes against a stale state and can fail
downstream with not-negotiated.

Mark the source pad for reconfigure after setting the appsrc caps so the
base source renegotiates with the new caps before the next buffer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: clear appsrc caps on stop for clean reconnect

On stop the negotiated appsrc caps were left in place, so a restart,
reconnect, or switch to a node with different caps could be refused by an
allow-renegotiation=false listener still holding the previous caps. Clear
the caps on stop so the next attach renegotiates from scratch.

Based on RidgeRun gst-interpipe PR RidgeRun#69.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* tests: cold-attach coverage and gstd reproduction harness

Add test_cold_attach, a gstcheck smoke test for the compensate-ts +
is-live cold-ordering path the suite did not cover (consumer played
before the producer node exists). On current GStreamer this path passes
with a flexible converter downstream, so it guards against regressions
there.

The end-to-end race only surfaces with a strict downstream encoder that
does not renegotiate on late caps, which needs the multi-pipeline gstd
runtime rather than a single-process unit test. Add tests/gstd/ (a gstd
build image plus a harness driving the three-leg split over gstd's HTTP
API) so the deployment encoder can be substituted via the ENC override
to reproduce and confirm the fix end to end.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: fix caps leak, NULL derefs, and set_caps correctness

Review follow-ups in the caps path:

- are_caps_compatible leaked the intersected caps on every call; unref it
  before returning.
- intersect_listener_caps unconditionally unreffed the listener caps, but
  the listener get_caps may legitimately return NULL; guard the unref.
- process_sample dereferenced the sample buffer without checking it; a
  sample can carry no buffer. Skip forwarding in that case.
- set_caps ignored the parent set_caps result; fail if the parent rejects
  the caps instead of forwarding a state the parent did not accept.
- set_caps read the listeners table size before taking the lock; move the
  check under the lock so it cannot race a concurrent listener add.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: add caps diagnostics around the negotiation path

Add INFO/DEBUG logging at the caps decision points so the cold-attach and
renegotiation flow can be traced in the field: the listener get_caps
result and negotiated state, the listener caps being set plus the
reconfigure mark on a late update, and the producer-side application of
node caps to a listener that attached without caps.

These are quiet at the default debug level and surface with
interpipesrc:5 / interpipesink:5.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: guard the pending serial events queue with a mutex

The pending serial events GQueue was pushed from the node's streaming
thread (push_event) and drained from this element's streaming thread
(create) with no synchronization; GQueue is not thread-safe, so concurrent
access could corrupt it. Add a dedicated mutex and hold it around every
queue operation. In create, decide and dequeue under the lock but push the
event downstream after releasing it, so the lock is never held across
gst_pad_push_event.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: hold the listeners lock across caps negotiation in get_caps

get_caps released the listeners lock right after intersecting listener
caps, then kept reading and writing sink->caps_negotiated and, on the
no-intersection path, walked the listeners table unlocked. Both are
mutated concurrently by add/remove_listener and set_caps, so this raced
and could double-unref caps_negotiated or walk a table mid-rehash.

Hold the lock across the whole negotiation. For the detach path, snapshot
the listeners holding a ref on each, release the lock, then call
leave_node outside it, since leave_node re-enters this element through
remove_listener and would otherwise deadlock on the same mutex.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: fix allocation aggregation, stale listening flag, and priv leak

Three review follow-ups:

- propose_allocation accumulated the per-listener result with |= starting
  from TRUE, so it was always TRUE and the failure-handling branch that
  drops the aggregated metas was dead. Use &= so a listener that cannot
  satisfy the allocation query causes the metas to be dropped instead of
  claiming unsupported allocation. (Behavioural change in the allocation
  path; review if specific allocation negotiation is relied upon.)

- interpipesrc set_property(listen-to) set listening=TRUE and logged
  success even when the listen attempt failed; only set it on success.

- The registry leaked a freshly allocated listener priv when add_listener
  failed; free it on that path while leaving an existing, table-owned priv
  in place.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesrc: harden compensate-ts against invalid and underflowing stamps

The compensate-ts path adjusted PTS/DTS without checking validity or
guarding the subtraction. A buffer with no PTS (GST_CLOCK_TIME_NONE) was
treated as a huge value and turned into a bogus finite timestamp, and DTS
was decremented without a lower bound, so a reordered stream whose DTS is
below the base-time offset would underflow into a near-infinite DTS.

Only adjust valid timestamps, keep the existing drop-when-unsynced
behaviour for a valid PTS below the offset, and clamp DTS to zero instead
of underflowing (consistent with the event-timestamp path). The normal
valid-timestamp case is unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: lock shared reads in add_listener and log rejected buffers

add_listener read the listeners table size and caps_negotiated without the
lock while deciding whether to send a reconfigure or renegotiate. Snapshot
both under the lock so the decision uses a consistent view; the lock is not
held across the pad and caps calls, which could re-enter the element.

Also log when a listener does not accept a forwarded buffer instead of
silently discarding the push_buffer result.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: return a node reference from get_node to fix a use-after-free

gst_inter_pipe_get_node returned a borrowed pointer looked up under the
nodes lock, so a caller (notably interpipesrc forwarding an upstream event,
and the registry listen/leave paths) could use a node that another thread
removed and finalized in the meantime.

Take a reference under the lock and return it transfer-full; update the
three callers to unref on every path. Documented in the header.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipesink: key the listeners table by the listener object

The listeners table was keyed by the listener's name string pointer with
identity hashing, which is fragile: the name pointer can change if the
element is renamed, leaving a stale key that can never be matched or
removed. Key by the listener object pointer instead, which is stable for
the object's lifetime, and derive the name from the listener in the
per-listener callbacks.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: let the registry nodes table own its key strings

The nodes table keyed on the node-name pointer borrowed from the element,
which is only safe as long as every caller removes the node before freeing
the name. Give the table a duplicated, table-owned key (g_free on removal)
so the registry no longer depends on that discipline.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* interpipe: harden listen-to threading, cut per-buffer caps query, fix recompute

Production-readiness pass over the cold-attach branch covering a thread-safety
hole, a hot-path regression, a caps-negotiation correctness bug, a leak, and
log flooding.

- listen-to use-after-free: the registry borrowed the listener's listen-to
  string pointer, so a property change freeing it on the app thread could leave
  the registry (and interpipesrc's streaming thread) reading freed memory. The
  registry now owns a g_strdup'd copy, freed on replace/remove. interpipesrc
  guards every listen_to read/write with GST_OBJECT_LOCK (set/get_property,
  start, stop, the upstream-event path, the node-added/removed callbacks, and
  the listen rollback). The lock is held only around the read/swap and never
  across a listen/leave call, keeping the order listeners_mutex -> object lock
  one-directional (no inversion).

- per-buffer caps query: push_to_listener called ilistener_get_caps for every
  buffer and every listener; with allow-renegotiation=true that ran a full
  downstream peer caps query while holding the fanout lock. Add a cheap
  is_negotiated predicate to the listener interface, backed by interpipesrc's
  new caps_primed flag. The flag is cleared on every (re)attach (in
  gst_inter_pipe_src_listen_node, the single choke point start/set_property/
  node_added funnel through) and on stop, and set when caps are applied. A node
  switch therefore re-primes caps from the new node's first buffer instead of
  being skipped because stale appsrc caps were still present.

- get_caps recompute: reset caps_negotiated before intersecting the listeners.
  Folding each listener into a stale value could only ever narrow the result, so
  a listener that renegotiated to broader caps could never widen it back.

- add_listener caps leak: the src_negotiated && !sinkcaps path jumped to
  add_to_list, skipping the srccaps unref. Release it before the jump.

- logging: the per-buffer "can not be synchronized yet" message fired at the
  frame rate during cold start/reconnect; lower it from WARNING to DEBUG.

Validated in Docker (GStreamer 1.28.3): clean build, gstcheck 15/15, gstd
cold-attach repro ordered+rapid PASS (0 not-negotiated, no deadlock from the new
locking), valgrind 0 definitely/indirectly lost with no interpipe frames across
cold_attach/hot_plug/block_switch/caps_renegotiation/set_caps/stream_sync.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* docs: document the Docker build/test workflow in the README

Add a "Building and testing with Docker" section covering the three images:
Dockerfile (build + gstcheck suite), Dockerfile.valgrind (leak checks with the
combined GLib/GStreamer suppressions), and Dockerfile.gstd (cross-process
cold-attach reproduction over gstd's HTTP API). Includes the bind-mount
invocations and the tests/check/gst_test_<name> binary-path detail so working-
tree changes can be exercised without rebuilding the image.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants