Add suppress_health_degradation stream config to prevent optional streams from degrading agent health#49511
Add suppress_health_degradation stream config to prevent optional streams from degrading agent health#49511Oddly wants to merge 2 commits intoelastic:mainfrom
Conversation
🤖 GitHub commentsJust comment with:
|
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughAdds a per-stream Suggested labelsTeam:Elastic-Agent-Data-Plane 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment Tip CodeRabbit can use OpenGrep to find security vulnerabilities and bugs across 17+ programming languages.OpenGrep is compatible with Semgrep configurations. Add an |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@x-pack/libbeat/management/unit.go`:
- Around line 369-375: The update() path currently only mutates
u.streamStates[*].suppressHealthDegradation and never recomputes the unit
aggregate health, leaving stale Failed/Degraded unit state; after you change
existing.suppressHealthDegradation in update(), force a recompute and publish of
the unit health so suppression flips take effect immediately—either invoke
updateStateForStream(...) for that stream in a way that bypasses the "ignore
same-state" check (add a force parameter or call a new helper), or implement a
small helper that scans u.streamStates to recalculate the aggregate health and
calls the existing publish/update routine to emit the new unit state.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: abb20d59-2b6e-4e8a-a10e-63d15e50284c
📒 Files selected for processing (2)
x-pack/libbeat/management/unit.gox-pack/libbeat/management/unit_test.go
dac7eef to
f2ff4f3
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
x-pack/libbeat/management/unit.go (1)
369-375:⚠️ Potential issue | 🔴 CriticalRecompute and publish unit health when suppression flips.
update()mutatesexisting.suppressHealthDegradationbut never recalculates/emits aggregate unit status. If a stream is alreadyDegraded/Failed, unit health can stay stale indefinitely because same-state stream updates are ignored later (Line 323).Proposed fix (minimal)
func (u *agentUnit) update(cu *client.Unit) { u.mtx.Lock() defer u.mtx.Unlock() u.softDeleted = false u.clientUnit = cu + suppressionChanged := false inputStatus := getStatus(cu.Expected().State) if u.inputLevelState.state != inputStatus { u.inputLevelState = unitState{ state: inputStatus, } } newStreamStates, newStreamIDs := getStreamStates(cu.Expected()) for key, state := range newStreamStates { if existing, exists := u.streamStates[key]; exists { - // Preserve current health state but update the suppressHealthDegradation flag - // in case the stream config changed. - existing.suppressHealthDegradation = state.suppressHealthDegradation + if existing.suppressHealthDegradation != state.suppressHealthDegradation { + suppressionChanged = true + } + existing.suppressHealthDegradation = state.suppressHealthDegradation u.streamStates[key] = existing continue } u.streamStates[key] = state } @@ switch { @@ } + + if suppressionChanged { + state, msg := u.calcState() + streamsPayload := make(map[string]interface{}, len(u.streamStates)) + for id, streamState := range u.streamStates { + streamsPayload[id] = map[string]interface{}{ + "status": getUnitState(streamState.state).String(), + "error": streamState.msg, + } + } + if err := u.clientUnit.UpdateState(getUnitState(state), msg, map[string]interface{}{"streams": streamsPayload}); err != nil { + u.logger.Warnf("failed to update state for input %s: %v", u.ID(), err) + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/libbeat/management/unit.go` around lines 369 - 375, In update(), when you mutate existing.suppressHealthDegradation for entries in u.streamStates, detect that the suppression flag flipped and then trigger the unit-level health recomputation/publish path so the aggregate unit health is recalculated and emitted; specifically, inside the loop that updates existing.suppressHealthDegradation (in update()), after assigning existing.suppressHealthDegradation = state.suppressHealthDegradation, call the same routine you use for stream-state transitions to recompute and publish aggregate unit health (i.e., the unit health recompute/publish codepath) so unit health doesn't remain stale when suppression toggles.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@x-pack/libbeat/management/unit.go`:
- Around line 369-375: In update(), when you mutate
existing.suppressHealthDegradation for entries in u.streamStates, detect that
the suppression flag flipped and then trigger the unit-level health
recomputation/publish path so the aggregate unit health is recalculated and
emitted; specifically, inside the loop that updates
existing.suppressHealthDegradation (in update()), after assigning
existing.suppressHealthDegradation = state.suppressHealthDegradation, call the
same routine you use for stream-state transitions to recompute and publish
aggregate unit health (i.e., the unit health recompute/publish codepath) so unit
health doesn't remain stale when suppression toggles.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: da31b39f-bea6-4a10-b13e-ec60181529fd
📒 Files selected for processing (2)
x-pack/libbeat/management/unit.gox-pack/libbeat/management/unit_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- x-pack/libbeat/management/unit_test.go
…eams from degrading agent health Adds a `suppress_health_degradation` config flag at the stream level in the management framework. When set, fetch failures are still logged and tracked but do not mark the agent as Degraded. This lets operators build broad Fleet policies with integrations that may not be running on every enrolled host, without those absent services dragging the entire agent into a degraded state. Unlike a per-input approach, this change lives in the health aggregation layer (`calcState()` in `unit.go`) so every input type gets the flag for free: CEL, httpjson, metricbeat modules, filebeat inputs, and any future inputs that report per-stream health. The flag is read from the stream's existing `Source` protobuf field — no proto changes or per-input modifications needed. Behaviour when `suppress_health_degradation: true`: - Input still retries every `period` - Errors are logged at ERROR level - Per-stream status still reports Degraded/Failed in the streams payload - The stream is excluded from the unit's aggregate health calculation - On recovery, stream status resets to Running as usual Tested on a live Elastic Agent 9.3.1 cluster across three input types (CEL, redis/metrics, nginx/metrics) with 9 permutations — all passed. Supersedes elastic#49492 Fixes: elastic/elastic-agent#12885
f2ff4f3 to
74e2856
Compare
Live cluster validationTested on a patched Elastic Agent 9.3.1 built from this branch via Fresh install (policy created from scratch each time):
In-place update (same policy updated via PUT without agent restart, exercises the
Cross-input types (three integrations on one agent policy):
|
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
|
/test |
|
You need to update/format the files. |
|
/test |
Proposed commit message
Adds a
suppress_health_degradationconfig flag at the stream level in themanagement framework. When set, fetch failures from all inputs are still logged and tracked but do not mark the agent as Degraded. This lets operators build broad Fleet
policies with integrations that may not be running on every enrolled host,
without those absent services dragging the entire agent into a degraded state.
Unlike a per-input approach (which we tried first), this change lives in the health aggregation layer
(
calcState()inunit.go) so every input type gets the flag for free: CEL,httpjson, metricbeat modules, filebeat inputs, and any future inputs that
report per-stream health.
The flag is read from the stream's existing
Sourceprotobuf field.Behaviour when
suppress_health_degradation: true:periodUsage in Fleet integrations:
Users can also inject
suppress_health_degradation: truevia Fleet's AdvancedSettings YAML on any existing integration without package changes.
Tested on a live Elastic Agent 9.3.1 cluster across three input types (CEL,
redis/metrics, nginx/metrics) with 9 permutations:
Replaces #49492
Fixes: elastic/elastic-agent#12885