From e2a27d6368a35902b4816b7c9112808733a1b327 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 26 Feb 2026 07:58:25 -0800 Subject: [PATCH] [OpAMP] Add E2E test (#6289) * Implement API boilerplate for POST /v1/opamp endpoint * Add OpAMP section to dev doc * Flesh out dev doc * Update dev doc to use Fleet enrollment token * Check feature flag before handing OpAMP requests * Allow running specific tests with TEST_RUN env var * Removing irrelevant file * WIP: Reimplement using opamp-go server package * Update spec * Move OpAMP documentation to separate file * Remove error that's no longer needed * Update OpAMP feature flag test to use Enabled() method The test previously referenced ErrOpAMPDisabled and handleOpAMP which no longer exist. The feature flag check now happens at route registration time, so test the Enabled() method directly instead. Co-Authored-By: Claude Opus 4.6 * Disable HTTP keep-alive for OpAMP requests to fix EOF errors The server's IdleTimeout (30s) matches the OTel Collector's polling interval (~30s), causing a race where the server closes the idle connection just as the client tries to reuse it. Setting Connection: close on OpAMP responses forces a fresh connection per poll, eliminating the race with negligible overhead given the 30s polling interval. Co-Authored-By: Claude Opus 4.6 * Adding configuration files to be used by OpAMP E2E test * WIP: Adding OpAMP E2E test * Fix otelcol template data to use nested OpAMP keys The otelcol-opamp.tpl template accesses {{ .OpAMP.APIKey }} and {{ .OpAMP.InstanceUID }}, so the template data must nest these under an "OpAMP" key rather than passing them as flat top-level keys. Co-Authored-By: Claude Opus 4.6 * Use distinct filename for otelcol config in TestOpAMP The otelcol config was being written to config.yml, overwriting the fleet-server config in the same temp dir. Rename it to otelcol.yml. Co-Authored-By: Claude Opus 4.6 * Make otelcol-contrib download URL platform-aware in TestOpAMP Use runtime.GOOS and runtime.GOARCH to build the download URL dynamically instead of hardcoding darwin_arm64. Also chmod the extracted binary since extractTarGz doesn't preserve permissions. Co-Authored-By: Claude Opus 4.6 * Fix resp.Body handling in TestOpAMP Use explicit Close() instead of defer since resp is reassigned later in the function, which would cause the deferred close to act on the wrong response. Co-Authored-By: Claude Opus 4.6 * Increase TestOpAMP timeout and use defer for cleanup Increase context timeout from 1 to 3 minutes to account for the otelcol-contrib download. Use defer for cancel() and cmd.Wait() so cleanup happens even on test failure. Co-Authored-By: Claude Opus 4.6 * Start OTel Collector in TestOpAMP Extract instanceUID and apiKey into variables, remove the placeholder time.Sleep, and start the otelcol-contrib binary with the OpAMP extension config pointing at fleet-server. Co-Authored-By: Claude Opus 4.6 * Verify agent enrollment in TestOpAMP Poll Kibana via AgentIsOnline to confirm the OTel Collector was enrolled as an agent in Fleet Server after connecting via OpAMP. Co-Authored-By: Claude Opus 4.6 * Extract OTel Collector version into package-level constant Move the hardcoded otelcol-contrib version into otelColContribVersion in const.go so it can be easily updated in one place. Co-Authored-By: Claude Opus 4.6 * Continue writing TestOpAMP e2e test - Configure fleet-server with a static policy token for dummy-policy so that GetEnrollmentTokenForPolicyID can find the enrollment token - Fetch enrollment token before the raw POST to /v1/opamp - Add Authorization and Content-Type headers to the raw POST - Assert HTTP 200 response from the raw POST Co-Authored-By: Claude Sonnet 4.6 * Fix TestOpAMP e2e test - Enroll a dummy agent before starting the OTel Collector to initialize the .fleet-agents index. Without this, findEnrolledAgent fails with index_not_found_exception in a standalone fleet-server environment (unlike agent-managed fleet-server which self-enrolls on startup). - Add AgentHasStatus scaffold method that accepts multiple acceptable statuses, and AgentIsUpdating that delegates to it. - Use AgentIsUpdating in TestOpAMP: OpAMP agents communicate via the OpAMP protocol rather than Fleet's normal checkin/ack protocol, so they never acknowledge the initial policy change action and Kibana shows them as "updating" rather than "online". Co-Authored-By: Claude Sonnet 4.6 * Fixing conflicts during rebase * Download OTel Contrib source and build collector from it * Running go fmt * Fetch entire Agent doc from ES and make finer-grained assertions on its contents * Check status from doc field --------- Co-authored-by: Claude Opus 4.6 (cherry picked from commit 7aededf9c3090ac04d8ce5164fa4ab159d7bde05) --- testing/e2e/scaffold/scaffold.go | 50 +++++++- testing/e2e/stand_alone_test.go | 129 +++++++++++++++++++++ testing/e2e/testdata/otelcol-opamp.tpl | 27 +++++ testing/e2e/testdata/stand-alone-opamp.tpl | 21 ++++ 4 files changed, 222 insertions(+), 5 deletions(-) create mode 100644 testing/e2e/testdata/otelcol-opamp.tpl create mode 100644 testing/e2e/testdata/stand-alone-opamp.tpl diff --git a/testing/e2e/scaffold/scaffold.go b/testing/e2e/scaffold/scaffold.go index 8b40f77b28..2998d97fe6 100644 --- a/testing/e2e/scaffold/scaffold.go +++ b/testing/e2e/scaffold/scaffold.go @@ -228,14 +228,14 @@ func (s *Scaffold) FleetServerStatusCondition(ctx context.Context, url string, c } } -// AgentIsOnline will check Kibana if the agent specified by the passed id has the online status. +// AgentIsOnline polls Kibana's Fleet API until the agent with the given ID has the online status. // The test is marked as failed if the passed context terminates before that. func (s *Scaffold) AgentIsOnline(ctx context.Context, id string) { timer := time.NewTimer(time.Second) for { select { case <-ctx.Done(): - s.Require().NoError(ctx.Err(), "context expired before agent reported online") + s.Require().NoError(ctx.Err(), "context expired before agent reached expected status") return case <-timer.C: req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:5601/api/fleet/agents/"+id, nil) @@ -308,9 +308,49 @@ type KibanaAgent struct { } type ESAgentDoc struct { - Revision int `json:"policy_revision_idx"` - PolicyID string `json:"policy_id"` - AgentPolicyID string `json:"agent_policy_id"` + Revision int `json:"policy_revision_idx"` + PolicyID string `json:"policy_id"` + AgentPolicyID string `json:"agent_policy_id"` + Type string `json:"type"` + Status string `json:"status"` + Tags []string `json:"tags"` + Agent struct { + ID string `json:"id"` + Version string `json:"version"` + Type string `json:"type"` + } `json:"agent"` +} + +func (s *Scaffold) WaitForAgentDoc(ctx context.Context, id string) ESAgentDoc { + timer := time.NewTimer(time.Second) + for { + select { + case <-ctx.Done(): + s.Require().NoError(ctx.Err(), "context expired before agent document appeared in .fleet-agents") + return ESAgentDoc{} + case <-timer.C: + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:9200/.fleet-agents/_doc/"+id, nil) + s.Require().NoError(err) + req.SetBasicAuth(s.ElasticUser, s.ElasticPass) + resp, err := s.Client.Do(req) + if err != nil { + timer.Reset(time.Second) + continue + } + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + timer.Reset(time.Second) + continue + } + var obj struct { + Source ESAgentDoc `json:"_source"` + } + err = json.NewDecoder(resp.Body).Decode(&obj) + resp.Body.Close() + s.Require().NoError(err) + return obj.Source + } + } } func (s *Scaffold) GetAgent(ctx context.Context, id string) ESAgentDoc { diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index 3a37dac034..dde0e7a1b5 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -579,3 +579,132 @@ func (suite *StandAloneSuite) TestAPMInstrumentation() { cancel() cmd.Wait() } + +// TestOpAMP ensures that the OpAMP endpoint in Fleet Server works as expected by installing +// an OTel Collector, configuring it with the OpAMP extension, and having it connect to Fleet +// Server using OpAMP, and verifying that Fleet Server responds to this request with an HTTP +// 200 OK status response. +func (suite *StandAloneSuite) TestOpAMP() { + // Create a config file from a template in the test temp dir + dir := suite.T().TempDir() + tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-opamp.tpl")) + suite.Require().NoError(err) + f, err := os.Create(filepath.Join(dir, "config.yml")) + suite.Require().NoError(err) + err = tpl.Execute(f, map[string]interface{}{ + "Hosts": suite.ESHosts, + "ServiceToken": suite.ServiceToken, + "StaticTokenKey": "opamp-e2e-test-key", + }) + f.Close() + suite.Require().NoError(err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + // Run the fleet-server binary + cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml")) + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGTERM) + } + cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath} + err = cmd.Start() + suite.Require().NoError(err) + defer cmd.Wait() + + suite.FleetServerStatusOK(ctx, "http://localhost:8220") + + apiKey := suite.GetEnrollmentTokenForPolicyID(ctx, "dummy-policy") + + // Make sure the OpAMP endpoint works. + req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:8220/v1/opamp", nil) + suite.Require().NoError(err) + req.Header.Set("Authorization", "ApiKey "+apiKey) + req.Header.Set("Content-Type", "application/x-protobuf") + + resp, err := suite.Client.Do(req) + suite.Require().NoError(err) + resp.Body.Close() + suite.Require().Equal(http.StatusOK, resp.StatusCode) + + // Enroll a dummy agent to initialize the .fleet-agents index before the OTel Collector connects. + // Without this, findEnrolledAgent fails with index_not_found_exception when the OTel Collector + // sends its first AgentToServer message, because .fleet-agents doesn't exist yet in a fresh + // standalone fleet-server environment (unlike agent-managed fleet-server which self-enrolls). + tester := api_version.NewClientAPITesterCurrent(suite.Scaffold, "http://localhost:8220", apiKey) + tester.Enroll(ctx, apiKey) + + // Clone OTel Collector contrib repository (shallow clone of main branch) + cloneDir := filepath.Join(dir, "opentelemetry-collector-contrib") + suite.T().Logf("Cloning opentelemetry-collector-contrib (main) to %s", cloneDir) + cloneCmd := exec.CommandContext(ctx, + "git", "clone", + "--depth", "1", + "https://github.com/open-telemetry/opentelemetry-collector-contrib", + cloneDir, + ) + cloneCmd.Stdout = os.Stdout + cloneCmd.Stderr = os.Stderr + err = cloneCmd.Run() + suite.Require().NoError(err) + + // Build the OTel Collector binary + suite.T().Log("Building otelcol-contrib binary via make otelcontribcol") + makeCmd := exec.CommandContext(ctx, "make", "otelcontribcol") + makeCmd.Dir = cloneDir + makeCmd.Stdout = os.Stdout + makeCmd.Stderr = os.Stderr + err = makeCmd.Run() + suite.Require().NoError(err) + + // The make target places the binary under bin/; move it to the expected path. + builtBinary := filepath.Join(cloneDir, "bin", fmt.Sprintf("otelcontribcol_%s_%s", runtime.GOOS, runtime.GOARCH)) + otelBinaryPath := filepath.Join(dir, "otelcol-contrib") + err = os.Rename(builtBinary, otelBinaryPath) + suite.Require().NoError(err) + + // Configure it with the OpAMP extension + instanceUID := "019b8d7a-2da8-7657-b52d-492a9de33319" + suite.T().Logf("Configuring OTel Collector with OpAMP extension (instanceUID=%s)", instanceUID) + tpl, err = template.ParseFiles(filepath.Join("testdata", "otelcol-opamp.tpl")) + suite.Require().NoError(err) + f, err = os.Create(filepath.Join(dir, "otelcol.yml")) + suite.Require().NoError(err) + err = tpl.Execute(f, map[string]interface{}{ + "OpAMP": map[string]string{ + "InstanceUID": instanceUID, + "APIKey": apiKey, + }, + }) + f.Close() + suite.Require().NoError(err) + + // Start OTel Collector + suite.T().Log("Starting OTel Collector") + otelCmd := exec.CommandContext(ctx, otelBinaryPath, "--config", filepath.Join(dir, "otelcol.yml")) + otelCmd.Cancel = func() error { + return otelCmd.Process.Signal(syscall.SIGTERM) + } + otelCmd.Stdout = os.Stdout + otelCmd.Stderr = os.Stderr + err = otelCmd.Start() + suite.Require().NoError(err) + defer otelCmd.Wait() + + // Verify that the OTel Collector was enrolled in Fleet by fetching its document from + // .fleet-agents and asserting on its contents. + suite.T().Logf("Waiting for agent %s to appear in .fleet-agents", instanceUID) + agentDoc := suite.WaitForAgentDoc(ctx, instanceUID) + + suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID") + // TODO: uncomment once https://github.com/elastic/fleet-server/pull/6400 is merged + // versionOut, err := exec.Command(otelBinaryPath, "--version").Output() + // suite.Require().NoError(err) + // otelVersion := strings.TrimPrefix(strings.TrimSpace(string(versionOut)), "otelcontribcol version ") + // suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP") + // suite.Equal("otelcontribcol", agentDoc.Agent.Type, "expected agent.type to be otelcontribcol") + // suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") + // suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") + // suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") + // suite.Equal("online", agentDoc.Status, "expected status to be online") +} diff --git a/testing/e2e/testdata/otelcol-opamp.tpl b/testing/e2e/testdata/otelcol-opamp.tpl new file mode 100644 index 0000000000..0d0ec1896a --- /dev/null +++ b/testing/e2e/testdata/otelcol-opamp.tpl @@ -0,0 +1,27 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + debug: + verbosity: detailed + +extensions: + opamp: + server: + http: + endpoint: http://localhost:8220/v1/opamp + tls: + insecure: true + headers: + Authorization: ApiKey {{ .OpAMP.APIKey }} + instance_uid: {{ .OpAMP.InstanceUID }} + +service: + pipelines: + logs: + receivers: [otlp] + exporters: [debug] + extensions: [opamp] \ No newline at end of file diff --git a/testing/e2e/testdata/stand-alone-opamp.tpl b/testing/e2e/testdata/stand-alone-opamp.tpl new file mode 100644 index 0000000000..f1f20b42f4 --- /dev/null +++ b/testing/e2e/testdata/stand-alone-opamp.tpl @@ -0,0 +1,21 @@ +output: + elasticsearch: + hosts: {{ .Hosts }} + service_token: {{ .ServiceToken }} + +fleet.agent.id: e2e-test-id + +inputs: +- type: fleet-server + server: + feature_flags: + enable_opamp: true + static_policy_tokens: + enabled: true + policy_tokens: + - token_key: {{ .StaticTokenKey }} + policy_id: dummy-policy + +logging: + to_stderr: true +