diff --git a/testing/e2e/scaffold/scaffold.go b/testing/e2e/scaffold/scaffold.go index 8b40f77b28..2998d97fe6 100644 --- a/testing/e2e/scaffold/scaffold.go +++ b/testing/e2e/scaffold/scaffold.go @@ -228,14 +228,14 @@ func (s *Scaffold) FleetServerStatusCondition(ctx context.Context, url string, c } } -// AgentIsOnline will check Kibana if the agent specified by the passed id has the online status. +// AgentIsOnline polls Kibana's Fleet API until the agent with the given ID has the online status. // The test is marked as failed if the passed context terminates before that. func (s *Scaffold) AgentIsOnline(ctx context.Context, id string) { timer := time.NewTimer(time.Second) for { select { case <-ctx.Done(): - s.Require().NoError(ctx.Err(), "context expired before agent reported online") + s.Require().NoError(ctx.Err(), "context expired before agent reached expected status") return case <-timer.C: req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:5601/api/fleet/agents/"+id, nil) @@ -308,9 +308,49 @@ type KibanaAgent struct { } type ESAgentDoc struct { - Revision int `json:"policy_revision_idx"` - PolicyID string `json:"policy_id"` - AgentPolicyID string `json:"agent_policy_id"` + Revision int `json:"policy_revision_idx"` + PolicyID string `json:"policy_id"` + AgentPolicyID string `json:"agent_policy_id"` + Type string `json:"type"` + Status string `json:"status"` + Tags []string `json:"tags"` + Agent struct { + ID string `json:"id"` + Version string `json:"version"` + Type string `json:"type"` + } `json:"agent"` +} + +func (s *Scaffold) WaitForAgentDoc(ctx context.Context, id string) ESAgentDoc { + timer := time.NewTimer(time.Second) + for { + select { + case <-ctx.Done(): + s.Require().NoError(ctx.Err(), "context expired before agent document appeared in .fleet-agents") + return ESAgentDoc{} + case <-timer.C: + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:9200/.fleet-agents/_doc/"+id, nil) + s.Require().NoError(err) + req.SetBasicAuth(s.ElasticUser, s.ElasticPass) + resp, err := s.Client.Do(req) + if err != nil { + timer.Reset(time.Second) + continue + } + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + timer.Reset(time.Second) + continue + } + var obj struct { + Source ESAgentDoc `json:"_source"` + } + err = json.NewDecoder(resp.Body).Decode(&obj) + resp.Body.Close() + s.Require().NoError(err) + return obj.Source + } + } } func (s *Scaffold) GetAgent(ctx context.Context, id string) ESAgentDoc { diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index 3a37dac034..dde0e7a1b5 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -579,3 +579,132 @@ func (suite *StandAloneSuite) TestAPMInstrumentation() { cancel() cmd.Wait() } + +// TestOpAMP ensures that the OpAMP endpoint in Fleet Server works as expected by installing +// an OTel Collector, configuring it with the OpAMP extension, and having it connect to Fleet +// Server using OpAMP, and verifying that Fleet Server responds to this request with an HTTP +// 200 OK status response. +func (suite *StandAloneSuite) TestOpAMP() { + // Create a config file from a template in the test temp dir + dir := suite.T().TempDir() + tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-opamp.tpl")) + suite.Require().NoError(err) + f, err := os.Create(filepath.Join(dir, "config.yml")) + suite.Require().NoError(err) + err = tpl.Execute(f, map[string]interface{}{ + "Hosts": suite.ESHosts, + "ServiceToken": suite.ServiceToken, + "StaticTokenKey": "opamp-e2e-test-key", + }) + f.Close() + suite.Require().NoError(err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + // Run the fleet-server binary + cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml")) + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGTERM) + } + cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath} + err = cmd.Start() + suite.Require().NoError(err) + defer cmd.Wait() + + suite.FleetServerStatusOK(ctx, "http://localhost:8220") + + apiKey := suite.GetEnrollmentTokenForPolicyID(ctx, "dummy-policy") + + // Make sure the OpAMP endpoint works. + req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:8220/v1/opamp", nil) + suite.Require().NoError(err) + req.Header.Set("Authorization", "ApiKey "+apiKey) + req.Header.Set("Content-Type", "application/x-protobuf") + + resp, err := suite.Client.Do(req) + suite.Require().NoError(err) + resp.Body.Close() + suite.Require().Equal(http.StatusOK, resp.StatusCode) + + // Enroll a dummy agent to initialize the .fleet-agents index before the OTel Collector connects. + // Without this, findEnrolledAgent fails with index_not_found_exception when the OTel Collector + // sends its first AgentToServer message, because .fleet-agents doesn't exist yet in a fresh + // standalone fleet-server environment (unlike agent-managed fleet-server which self-enrolls). + tester := api_version.NewClientAPITesterCurrent(suite.Scaffold, "http://localhost:8220", apiKey) + tester.Enroll(ctx, apiKey) + + // Clone OTel Collector contrib repository (shallow clone of main branch) + cloneDir := filepath.Join(dir, "opentelemetry-collector-contrib") + suite.T().Logf("Cloning opentelemetry-collector-contrib (main) to %s", cloneDir) + cloneCmd := exec.CommandContext(ctx, + "git", "clone", + "--depth", "1", + "https://github.com/open-telemetry/opentelemetry-collector-contrib", + cloneDir, + ) + cloneCmd.Stdout = os.Stdout + cloneCmd.Stderr = os.Stderr + err = cloneCmd.Run() + suite.Require().NoError(err) + + // Build the OTel Collector binary + suite.T().Log("Building otelcol-contrib binary via make otelcontribcol") + makeCmd := exec.CommandContext(ctx, "make", "otelcontribcol") + makeCmd.Dir = cloneDir + makeCmd.Stdout = os.Stdout + makeCmd.Stderr = os.Stderr + err = makeCmd.Run() + suite.Require().NoError(err) + + // The make target places the binary under bin/; move it to the expected path. + builtBinary := filepath.Join(cloneDir, "bin", fmt.Sprintf("otelcontribcol_%s_%s", runtime.GOOS, runtime.GOARCH)) + otelBinaryPath := filepath.Join(dir, "otelcol-contrib") + err = os.Rename(builtBinary, otelBinaryPath) + suite.Require().NoError(err) + + // Configure it with the OpAMP extension + instanceUID := "019b8d7a-2da8-7657-b52d-492a9de33319" + suite.T().Logf("Configuring OTel Collector with OpAMP extension (instanceUID=%s)", instanceUID) + tpl, err = template.ParseFiles(filepath.Join("testdata", "otelcol-opamp.tpl")) + suite.Require().NoError(err) + f, err = os.Create(filepath.Join(dir, "otelcol.yml")) + suite.Require().NoError(err) + err = tpl.Execute(f, map[string]interface{}{ + "OpAMP": map[string]string{ + "InstanceUID": instanceUID, + "APIKey": apiKey, + }, + }) + f.Close() + suite.Require().NoError(err) + + // Start OTel Collector + suite.T().Log("Starting OTel Collector") + otelCmd := exec.CommandContext(ctx, otelBinaryPath, "--config", filepath.Join(dir, "otelcol.yml")) + otelCmd.Cancel = func() error { + return otelCmd.Process.Signal(syscall.SIGTERM) + } + otelCmd.Stdout = os.Stdout + otelCmd.Stderr = os.Stderr + err = otelCmd.Start() + suite.Require().NoError(err) + defer otelCmd.Wait() + + // Verify that the OTel Collector was enrolled in Fleet by fetching its document from + // .fleet-agents and asserting on its contents. + suite.T().Logf("Waiting for agent %s to appear in .fleet-agents", instanceUID) + agentDoc := suite.WaitForAgentDoc(ctx, instanceUID) + + suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID") + // TODO: uncomment once https://github.com/elastic/fleet-server/pull/6400 is merged + // versionOut, err := exec.Command(otelBinaryPath, "--version").Output() + // suite.Require().NoError(err) + // otelVersion := strings.TrimPrefix(strings.TrimSpace(string(versionOut)), "otelcontribcol version ") + // suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP") + // suite.Equal("otelcontribcol", agentDoc.Agent.Type, "expected agent.type to be otelcontribcol") + // suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") + // suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") + // suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") + // suite.Equal("online", agentDoc.Status, "expected status to be online") +} diff --git a/testing/e2e/testdata/otelcol-opamp.tpl b/testing/e2e/testdata/otelcol-opamp.tpl new file mode 100644 index 0000000000..0d0ec1896a --- /dev/null +++ b/testing/e2e/testdata/otelcol-opamp.tpl @@ -0,0 +1,27 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + debug: + verbosity: detailed + +extensions: + opamp: + server: + http: + endpoint: http://localhost:8220/v1/opamp + tls: + insecure: true + headers: + Authorization: ApiKey {{ .OpAMP.APIKey }} + instance_uid: {{ .OpAMP.InstanceUID }} + +service: + pipelines: + logs: + receivers: [otlp] + exporters: [debug] + extensions: [opamp] \ No newline at end of file diff --git a/testing/e2e/testdata/stand-alone-opamp.tpl b/testing/e2e/testdata/stand-alone-opamp.tpl new file mode 100644 index 0000000000..f1f20b42f4 --- /dev/null +++ b/testing/e2e/testdata/stand-alone-opamp.tpl @@ -0,0 +1,21 @@ +output: + elasticsearch: + hosts: {{ .Hosts }} + service_token: {{ .ServiceToken }} + +fleet.agent.id: e2e-test-id + +inputs: +- type: fleet-server + server: + feature_flags: + enable_opamp: true + static_policy_tokens: + enabled: true + policy_tokens: + - token_key: {{ .StaticTokenKey }} + policy_id: dummy-policy + +logging: + to_stderr: true +