From 50e1f593af88da1593541deac016aa489e321cf8 Mon Sep 17 00:00:00 2001 From: amcheste-ai-agent <278991699+amcheste-ai-agent@users.noreply.github.com> Date: Tue, 2 Jun 2026 10:41:51 -0400 Subject: [PATCH] feat(observability): pipeline-aware metrics, dashboard, and printer column MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Knowledge-work teams operate at the granularity of pipeline stages and produced artifacts, not just tokens and cost. v0.8.0 needed observability to match — operators should be able to see, in a glance, which stage a team is in, how long stages take, what came out the other end, and whether delivery to external systems worked. Metrics (new, `kagents_` prefix to match the rebrand) - kagents_team_pipeline_stage_active{team,namespace,stage} — gauge, 1 while Running, 0 elsewhere. Stack-by-stage in Grafana for the classic flight-status view across many teams. - kagents_team_stage_duration_seconds{team,namespace,stage} — histogram, observed once at the Running → Completed edge. Exponential buckets starting at 30s, matching the existing team-duration histogram. - kagents_team_artifacts_produced_total{team,namespace,teammate} — counter, increments once per ArtifactStatus appended; idempotent across reconciles because the dedup guard on Status.Artifacts is the gate. - kagents_team_delivery_success_total / _failure_total {team,namespace,type} — counters, one increment per delivery attempt from executeDelivery. The existing `claude_*` metric family stays untouched so existing dashboards keep working; this PR adds five `kagents_*` series rather than renaming. A future major release can sync the prefixes. Reconciler wiring - updatePipelineStatus emits the stage gauge on every reconcile (Running → 1, otherwise 0) and observes duration exactly once at the Completed transition (gated on prevPhase != Completed). - recordTeammateArtifacts increments the artifact counter inside the dedup-checked append path. - executeDelivery records success vs failure per target alongside the existing event emission. Status / kubectl - New `Stage` printer column (priority=1, so `kubectl get agentteams` shows it when -o wide / on wide-enough terminals). Sources status.pipeline.currentStage. Dashboard - Pipeline section on the team detail view: progress bar over completed/total, list of stages with their phase chip and teammates-ready ratio. - Artifacts section listing name, producing teammate, and source path; renders only when status.artifacts is non-empty. - New pipelinePercent template helper, fully covered (nil, zero, half, full, overflow clamp). Tests - internal/metrics rose to 100% coverage. Five new tests on the new helpers + the names guard extended to the kagents_ prefix. - internal/dashboard rose to 73.9% with the new helper tests. - internal/controller existing pipeline-status tests still green. Docs - docs/explanation/operations.md metrics table extended with the five new series and a paragraph explaining the dual-prefix situation. Co-Authored-By: Claude Opus 4.7 (1M context) Co-Authored-By: amcheste <13696614+amcheste@users.noreply.github.com> --- api/v1alpha1/agentteam_types.go | 1 + .../kagents/crds/kagents.dev_agentteams.yaml | 4 + config/crd/bases/kagents.dev_agentteams.yaml | 4 + docs/explanation/operations.md | 12 ++- internal/controller/agentteam_controller.go | 24 ++++++ internal/dashboard/templates.go | 15 ++++ .../dashboard/templates/_detail_body.html | 48 ++++++++++++ internal/dashboard/templates_test.go | 56 ++++++++++++++ internal/metrics/prometheus.go | 73 +++++++++++++++++++ internal/metrics/prometheus_test.go | 46 ++++++++++++ 10 files changed, 282 insertions(+), 1 deletion(-) create mode 100644 internal/dashboard/templates_test.go diff --git a/api/v1alpha1/agentteam_types.go b/api/v1alpha1/agentteam_types.go index 8238035..ed7477b 100644 --- a/api/v1alpha1/agentteam_types.go +++ b/api/v1alpha1/agentteam_types.go @@ -848,6 +848,7 @@ type PullRequestStatus struct { // +kubebuilder:subresource:status // +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase` // +kubebuilder:printcolumn:name="Ready",type=string,JSONPath=`.status.ready` +// +kubebuilder:printcolumn:name="Stage",type=string,JSONPath=`.status.pipeline.currentStage`,priority=1 // +kubebuilder:printcolumn:name="Tasks Done",type=integer,JSONPath=`.status.tasks.completed` // +kubebuilder:printcolumn:name="Cost",type=string,JSONPath=`.status.estimatedCost` // +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` diff --git a/charts/kagents/crds/kagents.dev_agentteams.yaml b/charts/kagents/crds/kagents.dev_agentteams.yaml index fd17ac4..0b47083 100644 --- a/charts/kagents/crds/kagents.dev_agentteams.yaml +++ b/charts/kagents/crds/kagents.dev_agentteams.yaml @@ -21,6 +21,10 @@ spec: - jsonPath: .status.ready name: Ready type: string + - jsonPath: .status.pipeline.currentStage + name: Stage + priority: 1 + type: string - jsonPath: .status.tasks.completed name: Tasks Done type: integer diff --git a/config/crd/bases/kagents.dev_agentteams.yaml b/config/crd/bases/kagents.dev_agentteams.yaml index fd17ac4..0b47083 100644 --- a/config/crd/bases/kagents.dev_agentteams.yaml +++ b/config/crd/bases/kagents.dev_agentteams.yaml @@ -21,6 +21,10 @@ spec: - jsonPath: .status.ready name: Ready type: string + - jsonPath: .status.pipeline.currentStage + name: Stage + priority: 1 + type: string - jsonPath: .status.tasks.completed name: Tasks Done type: integer diff --git a/docs/explanation/operations.md b/docs/explanation/operations.md index 31cb3b2..f6d6731 100644 --- a/docs/explanation/operations.md +++ b/docs/explanation/operations.md @@ -88,7 +88,12 @@ The operator exposes Prometheus metrics, ships a Grafana dashboard, and fires we ### Prometheus metrics -The operator binary exposes `/metrics` on port 8080 by default. Eight series, all labeled by team name and (where applicable) teammate name + model: +The operator binary exposes `/metrics` on port 8080 by default. Two +prefix families are emitted in parallel: the original `claude_*` series +that have been there since v0.3.0 (kept stable to avoid breaking +existing dashboards), and a `kagents_*` family added in v0.8.0 that +covers knowledge-work observability (pipeline stages, artifacts, +delivery). Both stream from the same `/metrics` endpoint. | Metric | Type | Description | |--------|------|-------------| @@ -100,6 +105,11 @@ The operator binary exposes `/metrics` on port 8080 by default. Eight series, al | `claude_teammate_restarts_total` | counter | Pod restarts per teammate | | `claude_team_budget_remaining_usd` | gauge | `budgetLimit - estimatedCostUsd` | | `claude_teammate_idle_seconds` | histogram | Time between task completions per teammate | +| `kagents_team_pipeline_stage_active` | gauge | 1 while a pipeline stage is in `Running`, 0 otherwise. Labels: `team`, `namespace`, `stage` | +| `kagents_team_stage_duration_seconds` | histogram | Stage wall-clock duration observed once at the `Running → Completed` transition | +| `kagents_team_artifacts_produced_total` | counter | Artifacts appended to `status.artifacts` per teammate | +| `kagents_team_delivery_success_total` | counter | Successful `onComplete: deliver` dispatches, by target type | +| `kagents_team_delivery_failure_total` | counter | Failed deliveries by target type | Wire them to Prometheus by enabling the chart's ServiceMonitor: diff --git a/internal/controller/agentteam_controller.go b/internal/controller/agentteam_controller.go index e6bcb7b..0c4f10a 100644 --- a/internal/controller/agentteam_controller.go +++ b/internal/controller/agentteam_controller.go @@ -1739,9 +1739,11 @@ func (r *AgentTeamReconciler) executeDelivery(ctx context.Context, team *claudev if err := dispatcher.Send(ctx, r.Client, target, team); err != nil { status.Success = false status.Error = err.Error() + metrics.RecordDeliveryFailure(team.Name, team.Namespace, target.Type) r.recordEvent(team, corev1.EventTypeWarning, "DeliveryFailed", "Delivery %s to %s failed: %v", target.Type, status.Target, err) } else { + metrics.RecordDeliverySuccess(team.Name, team.Namespace, target.Type) r.recordEvent(team, corev1.EventTypeNormal, "DeliveryComplete", "Delivery %s to %s succeeded", target.Type, status.Target) } @@ -2190,6 +2192,7 @@ func (r *AgentTeamReconciler) updatePipelineStatus(team *claudev1alpha1.AgentTea } } + prevPhase := ss.Phase switch { case anyFailed: ss.Phase = "Failed" @@ -2214,6 +2217,22 @@ func (r *AgentTeamReconciler) updatePipelineStatus(team *claudev1alpha1.AgentTea ss.Phase = "Waiting" } + // Emit observability signals on phase transitions. Running and + // Completed are the two interesting edges: + // + // * Running: flip the stage_active gauge to 1 so dashboards + // show where each team currently is. + // * Completed: flip the gauge back to 0 and observe the stage's + // wall-clock duration (StartedAt → now). The + // histogram observation is gated on a fresh + // CompletedAt (== now) so re-reconciles of an + // already-completed stage don't double-count. + metrics.SetPipelineStageActive(team.Name, team.Namespace, ss.Name, ss.Phase == "Running") + if prevPhase != "Completed" && ss.Phase == "Completed" && ss.StartedAt != nil && ss.CompletedAt != nil { + metrics.ObservePipelineStageDuration(team.Name, team.Namespace, ss.Name, + ss.CompletedAt.Sub(ss.StartedAt.Time).Seconds()) + } + if ss.Phase == "Completed" { completed++ } else if currentStage == "" { @@ -2254,6 +2273,10 @@ func findProducerOutputPath(team *claudev1alpha1.AgentTeam, from, artifact strin // already present (same Name + ProducedBy) is not re-added, so calling this // every reconcile after the producer reaches Succeeded is safe. Should be // invoked once per teammate transition to Completed. +// +// Each newly-appended artifact also bumps the +// kagents_team_artifacts_produced_total counter — the existence check +// above is what makes the metric idempotent across reconciles. func recordTeammateArtifacts(team *claudev1alpha1.AgentTeam, tm claudev1alpha1.TeammateSpec, at time.Time) { if len(tm.Outputs) == 0 { return @@ -2274,6 +2297,7 @@ func recordTeammateArtifacts(team *claudev1alpha1.AgentTeam, tm claudev1alpha1.T ProducedBy: tm.Name, ProducedAt: metav1.NewTime(at), }) + metrics.RecordArtifactProduced(team.Name, team.Namespace, tm.Name) } } diff --git a/internal/dashboard/templates.go b/internal/dashboard/templates.go index d0c059c..7eb385e 100644 --- a/internal/dashboard/templates.go +++ b/internal/dashboard/templates.go @@ -59,6 +59,21 @@ var templateFuncs = template.FuncMap{ } return claudev1alpha1.TeammateStatus{Name: name} }, + + // pipelinePercent computes a 0..100 progress value for the pipeline + // progress bar. Returns 0 when the pipeline reports no stages — + // callers that gate the section on .Status.Pipeline being non-nil + // won't reach this in that case, but the guard is cheap. + "pipelinePercent": func(p *claudev1alpha1.PipelineStatus) int { + if p == nil || p.StagesTotal == 0 { + return 0 + } + pct := (p.StagesCompleted * 100) / p.StagesTotal + if pct > 100 { + return 100 + } + return pct + }, } // pageData wraps the typed payload in a render-time envelope so the layout diff --git a/internal/dashboard/templates/_detail_body.html b/internal/dashboard/templates/_detail_body.html index 6cdde39..1a3c31d 100644 --- a/internal/dashboard/templates/_detail_body.html +++ b/internal/dashboard/templates/_detail_body.html @@ -37,6 +37,54 @@

{{.Name}}

+ {{if .Status.Pipeline}} +
+

+ Pipeline + {{.Status.Pipeline.StagesCompleted}} of {{.Status.Pipeline.StagesTotal}} stages complete +

+
+ {{$pct := pipelinePercent .Status.Pipeline}} +
+
+
+
    + {{range .Status.Pipeline.Stages}} +
  1. + {{or .Phase "Waiting"}} + {{.Name}} + {{.TeammatesReady}} +
  2. + {{end}} +
+
+
+ {{end}} + + {{if .Status.Artifacts}} +
+

Artifacts

+ + + + + + + + + + {{range .Status.Artifacts}} + + + + + + {{end}} + +
NameProduced byPath
{{.Name}}{{.ProducedBy}}{{.Path}}
+
+ {{end}} +

Teammates

diff --git a/internal/dashboard/templates_test.go b/internal/dashboard/templates_test.go new file mode 100644 index 0000000..2772c30 --- /dev/null +++ b/internal/dashboard/templates_test.go @@ -0,0 +1,56 @@ +package dashboard + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + claudev1alpha1 "github.com/amcheste/kagents/api/v1alpha1" +) + +// pipelinePercent isn't reachable from outside the package via a named +// symbol — it's an entry in templateFuncs. Pulling it out by key +// preserves its blessed signature (function value) and lets us hit +// the boundary conditions without going through full template render. +func pipelinePercentFn() func(*claudev1alpha1.PipelineStatus) int { + return templateFuncs["pipelinePercent"].(func(*claudev1alpha1.PipelineStatus) int) +} + +func TestPipelinePercent_NilStatus(t *testing.T) { + t.Parallel() + assert.Equal(t, 0, pipelinePercentFn()(nil)) +} + +func TestPipelinePercent_ZeroTotal(t *testing.T) { + t.Parallel() + // Division-by-zero guard: a freshly initialized PipelineStatus has + // StagesTotal=0 before the reconciler runs the spec. Must not panic. + assert.Equal(t, 0, pipelinePercentFn()(&claudev1alpha1.PipelineStatus{})) +} + +func TestPipelinePercent_HalfComplete(t *testing.T) { + t.Parallel() + got := pipelinePercentFn()(&claudev1alpha1.PipelineStatus{ + StagesCompleted: 2, StagesTotal: 4, + }) + assert.Equal(t, 50, got) +} + +func TestPipelinePercent_AllComplete(t *testing.T) { + t.Parallel() + got := pipelinePercentFn()(&claudev1alpha1.PipelineStatus{ + StagesCompleted: 3, StagesTotal: 3, + }) + assert.Equal(t, 100, got) +} + +func TestPipelinePercent_OverflowClamps(t *testing.T) { + t.Parallel() + // Pathological case — reconciler bug counts more completed than + // total. The helper should clamp rather than render a >100% bar + // that breaks the layout. + got := pipelinePercentFn()(&claudev1alpha1.PipelineStatus{ + StagesCompleted: 99, StagesTotal: 3, + }) + assert.Equal(t, 100, got) +} diff --git a/internal/metrics/prometheus.go b/internal/metrics/prometheus.go index b0c8e7e..d0b8b31 100644 --- a/internal/metrics/prometheus.go +++ b/internal/metrics/prometheus.go @@ -50,6 +50,39 @@ var ( Buckets: prometheus.ExponentialBuckets(1, 2, 10), }, []string{"team", "teammate"}) + // --- Knowledge-work observability (v0.8.0+) --- + // + // These metrics use the `kagents_` prefix to match the rebranded + // project name. Existing `claude_*` metrics above stay put for + // backwards compatibility — they'll get a synchronized rename in a + // future major; for now operators dashboard against either set. + + pipelineStageActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kagents_team_pipeline_stage_active", + Help: "1 when this pipeline stage is in the Running phase, 0 otherwise. Useful for stacking by stage to visualize where a team is in flight.", + }, []string{"team", "namespace", "stage"}) + + pipelineStageDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "kagents_team_stage_duration_seconds", + Help: "Wall-clock duration from a pipeline stage's first teammate spawning to its last completing, in seconds. Observed once per (team, stage) at the Completed transition.", + Buckets: prometheus.ExponentialBuckets(30, 2, 10), + }, []string{"team", "namespace", "stage"}) + + teamArtifactsProduced = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kagents_team_artifacts_produced_total", + Help: "Artifacts recorded on a team's status by a teammate completing its declared outputs.", + }, []string{"team", "namespace", "teammate"}) + + teamDeliverySuccess = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kagents_team_delivery_success_total", + Help: "Successful deliveries dispatched when OnComplete=deliver, by target type.", + }, []string{"team", "namespace", "type"}) + + teamDeliveryFailure = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "kagents_team_delivery_failure_total", + Help: "Failed deliveries dispatched when OnComplete=deliver, by target type.", + }, []string{"team", "namespace", "type"}) + collectors = []prometheus.Collector{ teamActive, teamDuration, @@ -59,6 +92,11 @@ var ( teammateRestarts, teamBudgetRemaining, teammateIdle, + pipelineStageActive, + pipelineStageDuration, + teamArtifactsProduced, + teamDeliverySuccess, + teamDeliveryFailure, } registerOnce sync.Once @@ -140,3 +178,38 @@ func SetBudgetRemaining(team, namespace string, remaining float64) { func SetActiveTeams(count int) { teamActive.Set(float64(count)) } + +// SetPipelineStageActive marks a stage as Running (1) or not (0). The +// gauge is set on every reconcile so a stage that transitions +// Running → Completed flips to 0 without needing a separate "stage +// done" event. +func SetPipelineStageActive(team, namespace, stage string, active bool) { + v := 0.0 + if active { + v = 1.0 + } + pipelineStageActive.WithLabelValues(team, namespace, stage).Set(v) +} + +// ObservePipelineStageDuration records the wall-clock seconds a stage +// spent in Running before transitioning to Completed. Call this once +// per (team, stage) — the reconciler guards against re-observing. +func ObservePipelineStageDuration(team, namespace, stage string, durationSec float64) { + pipelineStageDuration.WithLabelValues(team, namespace, stage).Observe(durationSec) +} + +// RecordArtifactProduced increments the per-teammate artifact counter. +// One increment per artifact appended to status.artifacts. +func RecordArtifactProduced(team, namespace, teammate string) { + teamArtifactsProduced.WithLabelValues(team, namespace, teammate).Inc() +} + +// RecordDeliverySuccess increments the per-type delivery success counter. +func RecordDeliverySuccess(team, namespace, deliveryType string) { + teamDeliverySuccess.WithLabelValues(team, namespace, deliveryType).Inc() +} + +// RecordDeliveryFailure increments the per-type delivery failure counter. +func RecordDeliveryFailure(team, namespace, deliveryType string) { + teamDeliveryFailure.WithLabelValues(team, namespace, deliveryType).Inc() +} diff --git a/internal/metrics/prometheus_test.go b/internal/metrics/prometheus_test.go index 3ad9e95..400c91b 100644 --- a/internal/metrics/prometheus_test.go +++ b/internal/metrics/prometheus_test.go @@ -25,6 +25,11 @@ func resetState(t *testing.T) { teammateRestarts.Reset() teamBudgetRemaining.Reset() teammateIdle.Reset() + pipelineStageActive.Reset() + pipelineStageDuration.Reset() + teamArtifactsProduced.Reset() + teamDeliverySuccess.Reset() + teamDeliveryFailure.Reset() } func TestRegisterMetricsIsIdempotent(t *testing.T) { @@ -171,6 +176,12 @@ func TestMetricsExposeExpectedNames(t *testing.T) { "claude_teammate_restarts_total", "claude_team_budget_remaining_usd", "claude_teammate_idle_seconds", + // kagents_* — knowledge-work observability (v0.8.0+). + "kagents_team_pipeline_stage_active", + "kagents_team_stage_duration_seconds", + "kagents_team_artifacts_produced_total", + "kagents_team_delivery_success_total", + "kagents_team_delivery_failure_total", } // Collect descriptors rather than observations — Vec collectors with no @@ -216,3 +227,38 @@ func TestMetricsHelpTextHasClaudePrefix(t *testing.T) { assert.NotEmpty(t, help, "metric %s missing help text", mf.GetName()) } } + +// --- Knowledge-work observability (v0.8.0+) --- + +func TestSetPipelineStageActive_FlipsGauge(t *testing.T) { + resetState(t) + SetPipelineStageActive("team-a", "ns1", "research", true) + assert.Equal(t, 1.0, testutil.ToFloat64(pipelineStageActive.WithLabelValues("team-a", "ns1", "research"))) + SetPipelineStageActive("team-a", "ns1", "research", false) + assert.Equal(t, 0.0, testutil.ToFloat64(pipelineStageActive.WithLabelValues("team-a", "ns1", "research"))) +} + +func TestObservePipelineStageDuration_RecordsObservation(t *testing.T) { + resetState(t) + ObservePipelineStageDuration("team-a", "ns1", "analysis", 42) + // CollectAndCount on a histogram returns 1 once any observation has landed. + assert.Equal(t, 1, testutil.CollectAndCount(pipelineStageDuration)) +} + +func TestRecordArtifactProduced_Increments(t *testing.T) { + resetState(t) + RecordArtifactProduced("team-a", "ns1", "writer") + RecordArtifactProduced("team-a", "ns1", "writer") + assert.Equal(t, 2.0, testutil.ToFloat64(teamArtifactsProduced.WithLabelValues("team-a", "ns1", "writer"))) +} + +func TestRecordDelivery_SuccessAndFailureAreSeparate(t *testing.T) { + resetState(t) + RecordDeliverySuccess("team-a", "ns1", "webhook") + RecordDeliveryFailure("team-a", "ns1", "slack") + assert.Equal(t, 1.0, testutil.ToFloat64(teamDeliverySuccess.WithLabelValues("team-a", "ns1", "webhook"))) + assert.Equal(t, 1.0, testutil.ToFloat64(teamDeliveryFailure.WithLabelValues("team-a", "ns1", "slack"))) + // Counters are independent — labelling success vs failure on the + // SAME (team, type) tuple must not collide. + assert.Equal(t, 0.0, testutil.ToFloat64(teamDeliveryFailure.WithLabelValues("team-a", "ns1", "webhook"))) +}