Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions testing/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ func (s *Scaffold) FleetServerStatusCondition(ctx context.Context, url string, c
}
}

// AgentIsOnline will check Kibana if the agent specified by the passed id has the online status.
// AgentIsOnline polls Kibana's Fleet API until the agent with the given ID has the online status.
// The test is marked as failed if the passed context terminates before that.
func (s *Scaffold) AgentIsOnline(ctx context.Context, id string) {
timer := time.NewTimer(time.Second)
for {
select {
case <-ctx.Done():
s.Require().NoError(ctx.Err(), "context expired before agent reported online")
s.Require().NoError(ctx.Err(), "context expired before agent reached expected status")
return
case <-timer.C:
req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:5601/api/fleet/agents/"+id, nil)
Expand Down Expand Up @@ -308,9 +308,49 @@ type KibanaAgent struct {
}

type ESAgentDoc struct {
Revision int `json:"policy_revision_idx"`
PolicyID string `json:"policy_id"`
AgentPolicyID string `json:"agent_policy_id"`
Revision int `json:"policy_revision_idx"`
PolicyID string `json:"policy_id"`
AgentPolicyID string `json:"agent_policy_id"`
Type string `json:"type"`
Status string `json:"status"`
Tags []string `json:"tags"`
Agent struct {
ID string `json:"id"`
Version string `json:"version"`
Type string `json:"type"`
} `json:"agent"`
}

func (s *Scaffold) WaitForAgentDoc(ctx context.Context, id string) ESAgentDoc {
timer := time.NewTimer(time.Second)
for {
select {
case <-ctx.Done():
s.Require().NoError(ctx.Err(), "context expired before agent document appeared in .fleet-agents")
return ESAgentDoc{}
case <-timer.C:
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:9200/.fleet-agents/_doc/"+id, nil)
s.Require().NoError(err)
req.SetBasicAuth(s.ElasticUser, s.ElasticPass)
resp, err := s.Client.Do(req)
if err != nil {
timer.Reset(time.Second)
continue
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
timer.Reset(time.Second)
continue
}
var obj struct {
Source ESAgentDoc `json:"_source"`
}
err = json.NewDecoder(resp.Body).Decode(&obj)
resp.Body.Close()
s.Require().NoError(err)
return obj.Source
}
}
}

func (s *Scaffold) GetAgent(ctx context.Context, id string) ESAgentDoc {
Expand Down
129 changes: 129 additions & 0 deletions testing/e2e/stand_alone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,132 @@ func (suite *StandAloneSuite) TestAPMInstrumentation() {
cancel()
cmd.Wait()
}

// TestOpAMP ensures that the OpAMP endpoint in Fleet Server works as expected by installing
// an OTel Collector, configuring it with the OpAMP extension, and having it connect to Fleet
// Server using OpAMP, and verifying that Fleet Server responds to this request with an HTTP
// 200 OK status response.
func (suite *StandAloneSuite) TestOpAMP() {
// Create a config file from a template in the test temp dir
dir := suite.T().TempDir()
tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-opamp.tpl"))
suite.Require().NoError(err)
f, err := os.Create(filepath.Join(dir, "config.yml"))
suite.Require().NoError(err)
err = tpl.Execute(f, map[string]interface{}{
"Hosts": suite.ESHosts,
"ServiceToken": suite.ServiceToken,
"StaticTokenKey": "opamp-e2e-test-key",
})
f.Close()
suite.Require().NoError(err)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

// Run the fleet-server binary
cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml"))
cmd.Cancel = func() error {
return cmd.Process.Signal(syscall.SIGTERM)
}
cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath}
err = cmd.Start()
suite.Require().NoError(err)
defer cmd.Wait()

suite.FleetServerStatusOK(ctx, "http://localhost:8220")

apiKey := suite.GetEnrollmentTokenForPolicyID(ctx, "dummy-policy")

// Make sure the OpAMP endpoint works.
req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:8220/v1/opamp", nil)
suite.Require().NoError(err)
req.Header.Set("Authorization", "ApiKey "+apiKey)
req.Header.Set("Content-Type", "application/x-protobuf")

resp, err := suite.Client.Do(req)
suite.Require().NoError(err)
resp.Body.Close()
suite.Require().Equal(http.StatusOK, resp.StatusCode)

// Enroll a dummy agent to initialize the .fleet-agents index before the OTel Collector connects.
// Without this, findEnrolledAgent fails with index_not_found_exception when the OTel Collector
// sends its first AgentToServer message, because .fleet-agents doesn't exist yet in a fresh
// standalone fleet-server environment (unlike agent-managed fleet-server which self-enrolls).
tester := api_version.NewClientAPITesterCurrent(suite.Scaffold, "http://localhost:8220", apiKey)
tester.Enroll(ctx, apiKey)

// Clone OTel Collector contrib repository (shallow clone of main branch)
cloneDir := filepath.Join(dir, "opentelemetry-collector-contrib")
suite.T().Logf("Cloning opentelemetry-collector-contrib (main) to %s", cloneDir)
cloneCmd := exec.CommandContext(ctx,
"git", "clone",
"--depth", "1",
"https://github.com/open-telemetry/opentelemetry-collector-contrib",
cloneDir,
)
cloneCmd.Stdout = os.Stdout
cloneCmd.Stderr = os.Stderr
err = cloneCmd.Run()
suite.Require().NoError(err)

// Build the OTel Collector binary
suite.T().Log("Building otelcol-contrib binary via make otelcontribcol")
makeCmd := exec.CommandContext(ctx, "make", "otelcontribcol")
makeCmd.Dir = cloneDir
makeCmd.Stdout = os.Stdout
makeCmd.Stderr = os.Stderr
err = makeCmd.Run()
suite.Require().NoError(err)

// The make target places the binary under bin/; move it to the expected path.
builtBinary := filepath.Join(cloneDir, "bin", fmt.Sprintf("otelcontribcol_%s_%s", runtime.GOOS, runtime.GOARCH))
otelBinaryPath := filepath.Join(dir, "otelcol-contrib")
err = os.Rename(builtBinary, otelBinaryPath)
suite.Require().NoError(err)

// Configure it with the OpAMP extension
instanceUID := "019b8d7a-2da8-7657-b52d-492a9de33319"
suite.T().Logf("Configuring OTel Collector with OpAMP extension (instanceUID=%s)", instanceUID)
tpl, err = template.ParseFiles(filepath.Join("testdata", "otelcol-opamp.tpl"))
suite.Require().NoError(err)
f, err = os.Create(filepath.Join(dir, "otelcol.yml"))
suite.Require().NoError(err)
err = tpl.Execute(f, map[string]interface{}{
"OpAMP": map[string]string{
"InstanceUID": instanceUID,
"APIKey": apiKey,
},
})
f.Close()
suite.Require().NoError(err)

// Start OTel Collector
suite.T().Log("Starting OTel Collector")
otelCmd := exec.CommandContext(ctx, otelBinaryPath, "--config", filepath.Join(dir, "otelcol.yml"))
otelCmd.Cancel = func() error {
return otelCmd.Process.Signal(syscall.SIGTERM)
}
otelCmd.Stdout = os.Stdout
otelCmd.Stderr = os.Stderr
err = otelCmd.Start()
suite.Require().NoError(err)
defer otelCmd.Wait()

// Verify that the OTel Collector was enrolled in Fleet by fetching its document from
// .fleet-agents and asserting on its contents.
suite.T().Logf("Waiting for agent %s to appear in .fleet-agents", instanceUID)
agentDoc := suite.WaitForAgentDoc(ctx, instanceUID)

suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID")
// TODO: uncomment once https://github.com/elastic/fleet-server/pull/6400 is merged
// versionOut, err := exec.Command(otelBinaryPath, "--version").Output()
// suite.Require().NoError(err)
// otelVersion := strings.TrimPrefix(strings.TrimSpace(string(versionOut)), "otelcontribcol version ")
// suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP")
// suite.Equal("otelcontribcol", agentDoc.Agent.Type, "expected agent.type to be otelcontribcol")
// suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version")
// suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1")
// suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol")
// suite.Equal("online", agentDoc.Status, "expected status to be online")
}
27 changes: 27 additions & 0 deletions testing/e2e/testdata/otelcol-opamp.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317

exporters:
debug:
verbosity: detailed

extensions:
opamp:
server:
http:
endpoint: http://localhost:8220/v1/opamp
tls:
insecure: true
headers:
Authorization: ApiKey {{ .OpAMP.APIKey }}
instance_uid: {{ .OpAMP.InstanceUID }}

service:
pipelines:
logs:
receivers: [otlp]
exporters: [debug]
extensions: [opamp]
21 changes: 21 additions & 0 deletions testing/e2e/testdata/stand-alone-opamp.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
output:
elasticsearch:
hosts: {{ .Hosts }}
service_token: {{ .ServiceToken }}

fleet.agent.id: e2e-test-id

inputs:
- type: fleet-server
server:
feature_flags:
enable_opamp: true
static_policy_tokens:
enabled: true
policy_tokens:
- token_key: {{ .StaticTokenKey }}
policy_id: dummy-policy

logging:
to_stderr: true