Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions app/jobs/metricsjob/metricsjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func (mj *MetricsJob) Register(ctx context.Context, mp metrics.Pusher, mcred met
ctx, cancel := context.WithCancel(ctx)
mj.cancel = cancel
var creds []credential.Credential
var lastPgCred credential.Credential
var beatCount int
mj.wg.Add(1)
go func() {
defer mj.wg.Done()
mj.config.Trigger(ctx, func() (err error) {
beatCount++

// Determine if we should fetch credentials this beat
shouldFetch := len(creds) == 0 ||
(mj.config.CredFetchInterval > 0 && beatCount%mj.config.CredFetchInterval == 0)

Expand All @@ -61,26 +61,16 @@ func (mj *MetricsJob) Register(ctx context.Context, mp metrics.Pusher, mcred met
return err
}

if len(creds) == 0 {
// no op, waiting for the creds to be available
return nil
}

var pgcred credential.Credential
lastPgCred = credential.Credential{}
for _, cred := range creds {
// only support one db for now, first match will exit the finding of creds
if cred.Dialect == "postgresql" {
pgcred = cred
lastPgCred = cred
break
}
}

// Only push when we actually fetch credentials
return mp.Push(pgcred)
}

// If we didn't fetch this beat, return nil (no push)
return nil
return mp.Push(lastPgCred)
})
}()
return cancel
Expand Down
53 changes: 29 additions & 24 deletions app/jobs/metricsjob/metricsjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,35 +72,40 @@ func TestRegister_CachesCredsWithinThreshold(t *testing.T) {
credFetchInterval int
totalBeats int
expectedCredCalls int
expectedPushCalls int
description string
}{
{
name: "SkipCredFetchBeat=0_FetchOnce",
credFetchInterval: 0,
totalBeats: 5,
expectedCredCalls: 1,
expectedPushCalls: 5,
description: "When passed zero, function called once on beat 1 only",
},
{
name: "SkipCredFetchBeat=1_FetchEveryBeat",
credFetchInterval: 1,
totalBeats: 6,
expectedCredCalls: 6,
expectedPushCalls: 6,
description: "Should call every beat: 1,2,3,4,5,6",
},
{
name: "SkipCredFetchBeat=2_FetchEverySecondBeat",
credFetchInterval: 2,
totalBeats: 8,
expectedCredCalls: 5, // Changed from 4
description: "Should call on beats 1,2,4,6,8",
expectedCredCalls: 5,
expectedPushCalls: 8,
description: "Should push every beat, fetch on beats 1,2,4,6,8",
},
{
name: "SkipCredFetchBeat=3_FetchEveryThirdBeat",
credFetchInterval: 3,
totalBeats: 9,
expectedCredCalls: 4, // Changed from 3
description: "Should call on beats 1,3,6,9",
expectedCredCalls: 4,
expectedPushCalls: 9,
description: "Should push every beat, fetch on beats 1,3,6,9",
},
}

Expand Down Expand Up @@ -135,16 +140,16 @@ func TestRegister_CachesCredsWithinThreshold(t *testing.T) {
tt.description, tt.expectedCredCalls, authGetter.calls)
}

// Verify Push was called same number of times as GetCreds
if len(pusher.pushCalls) != tt.expectedCredCalls {
// Push is called on every beat regardless of cred fetch
if len(pusher.pushCalls) != tt.expectedPushCalls {
t.Errorf("expected %d Push calls, got %d",
tt.expectedCredCalls, len(pusher.pushCalls))
tt.expectedPushCalls, len(pusher.pushCalls))
}
})
}
}

func TestRegister_NoPushAfterEmptyCreds(t *testing.T) {
func TestRegister_AlwaysPushesOnEveryBeat(t *testing.T) {
tests := []struct {
name string
credFetchInterval int
Expand All @@ -158,56 +163,56 @@ func TestRegister_NoPushAfterEmptyCreds(t *testing.T) {
credFetchInterval: 0,
totalBeats: 5,
emptyCredsAfter: 0,
expectedPushCalls: 1,
description: "With interval=0, creds emptied after beat 1, only 1 push (beat 1), no refetch",
expectedPushCalls: 5,
description: "Pushes every beat even after creds emptied",
},
{
name: "Interval=0_NeverEmpty",
credFetchInterval: 0,
totalBeats: 5,
emptyCredsAfter: -1,
expectedPushCalls: 1,
description: "With interval=0, creds never emptied, only 1 push (beat 1), never refetch",
expectedPushCalls: 5,
description: "Pushes every beat with same creds",
},
{
name: "Interval=1_EmptyAfterFirstBeat",
credFetchInterval: 1,
totalBeats: 5,
emptyCredsAfter: 0,
expectedPushCalls: 1,
description: "With interval=1, creds emptied after beat 1, only 1 push (beat 1)",
expectedPushCalls: 5,
description: "Pushes every beat even after creds emptied",
},
{
name: "Interval=2_EmptyAfterFirstBeat",
credFetchInterval: 2,
totalBeats: 5,
emptyCredsAfter: 0,
expectedPushCalls: 1,
description: "With interval=2, creds emptied after beat 1, only 1 push (beat 1)",
expectedPushCalls: 5,
description: "Pushes every beat even after creds emptied",
},
{
name: "Interval=3_EmptyAfterFirstBeat",
credFetchInterval: 3,
totalBeats: 6,
emptyCredsAfter: 0,
expectedPushCalls: 1,
description: "With interval=3, creds emptied after beat 1, only 1 push (beat 1)",
expectedPushCalls: 6,
description: "Pushes every beat even after creds emptied",
},
{
name: "Interval=1_EmptyAfterThirdBeat",
credFetchInterval: 1,
totalBeats: 5,
emptyCredsAfter: 2,
expectedPushCalls: 3,
description: "With interval=1, creds emptied after beat 3, pushes on beats 1,2,3",
expectedPushCalls: 5,
description: "Pushes every beat, creds change mid-way",
},
{
name: "Interval=2_NeverEmpty",
credFetchInterval: 2,
totalBeats: 8,
emptyCredsAfter: -1,
expectedPushCalls: 5,
description: "With interval=2, creds never emptied, pushes on beats 1,2,4,6,8",
expectedPushCalls: 8,
description: "Pushes every beat with same creds",
},
}

Expand All @@ -224,7 +229,6 @@ func TestRegister_NoPushAfterEmptyCreds(t *testing.T) {
for i := range tt.totalBeats {
_ = fn()
if i == tt.emptyCredsAfter {
// After specified beat, all subsequent calls return empty credentials
authGetter.creds = []credential.Credential{}
}
}
Expand Down Expand Up @@ -266,7 +270,8 @@ func TestRegister_HandlesGetCredsError(t *testing.T) {

job.Shutdown()

// Should not push when GetCreds fails
// GetCreds fails on first fetch attempt (shouldFetch=true, len(creds)==0)
// The error is returned, so Push is not called
if len(pusher.pushCalls) != 0 {
t.Errorf("expected 0 Push calls on error, got %d", len(pusher.pushCalls))
}
Expand Down
120 changes: 84 additions & 36 deletions app/services/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
domainmetrics "hostlink/domain/metrics"
"hostlink/internal/apiserver"
"hostlink/internal/crypto"
"hostlink/internal/dockerdiscovery"
"hostlink/internal/networkmetrics"
"hostlink/internal/pgbouncermetrics"
"hostlink/internal/pgmetrics"
Expand All @@ -30,15 +31,16 @@ type Pusher interface {
}

type metricspusher struct {
apiserver apiserver.MetricsOperations
agentstate agentstate.Operations
metricscollector pgmetrics.Collector
syscollector sysmetrics.Collector
netcollector networkmetrics.Collector
storagecollector storagemetrics.Collector
pgbouncercollector pgbouncermetrics.Collector
crypto crypto.Service
privateKeyPath string
apiserver apiserver.MetricsOperations
agentstate agentstate.Operations
metricscollector pgmetrics.Collector
syscollector sysmetrics.Collector
netcollector networkmetrics.Collector
storagecollector storagemetrics.Collector
pgbouncercollector pgbouncermetrics.Collector
dockerDiscoverer dockerdiscovery.Discoverer
crypto crypto.Service
privateKeyPath string
}

func NewWithConf() (*metricspusher, error) {
Expand All @@ -59,6 +61,7 @@ func NewWithConf() (*metricspusher, error) {
netcollector: networkmetrics.New(),
storagecollector: storagemetrics.New(),
pgbouncercollector: pgbouncermetrics.New(),
dockerDiscoverer: dockerdiscovery.New(),
crypto: crypto.NewService(),
privateKeyPath: appconf.AgentPrivateKeyPath(),
}, nil
Expand All @@ -77,6 +80,7 @@ func NewWithDependencies(
netcollector networkmetrics.Collector,
storagecollector storagemetrics.Collector,
pgbouncercollector pgbouncermetrics.Collector,
dockerDiscoverer dockerdiscovery.Discoverer,
crypto crypto.Service,
privateKeyPath string,
) *metricspusher {
Expand All @@ -88,6 +92,7 @@ func NewWithDependencies(
netcollector: netcollector,
storagecollector: storagecollector,
pgbouncercollector: pgbouncercollector,
dockerDiscoverer: dockerDiscoverer,
crypto: crypto,
privateKeyPath: privateKeyPath,
}
Expand Down Expand Up @@ -158,17 +163,46 @@ func (mp *metricspusher) Push(cred credential.Credential) error {
})
}

dbMetrics, err := mp.metricscollector.Collect(cred)
// Collect from control-plane credential (primary PG)
hasPrimaryCred := cred.Host != "" || cred.Port != 0
if hasPrimaryCred {
dbMetrics, err := mp.metricscollector.Collect(cred)
if err != nil {
log.Warnf("primary database metrics collection failed: %v", err)
dbMetrics = domainmetrics.PostgreSQLDatabaseMetrics{Up: false}
} else {
dbMetrics.Up = true
}
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePostgreSQLDatabase,
Metrics: dbMetrics,
})

pgbouncerMetrics, err := mp.pgbouncercollector.Collect(cred)
if err != nil {
pgbouncerMetrics = domainmetrics.PgBouncerMetrics{Up: false}
} else {
pgbouncerMetrics.Up = true
}
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePgBouncer,
Metrics: pgbouncerMetrics,
})
}

// Discover Docker containers and collect PG metrics from each
dockerDBs, err := mp.dockerDiscoverer.DiscoverDatabases(ctx)
if err != nil {
log.Warnf("database metrics collection failed: %v", err)
dbMetrics = domainmetrics.PostgreSQLDatabaseMetrics{Up: false}
} else {
dbMetrics.Up = true
log.Warnf("docker database discovery failed: %v", err)
}
for _, d := range dockerDBs {
switch d.Type {
case dockerdiscovery.DatabaseTypePostgreSQL:
mp.collectDockerPGMetrics(ctx, d, &metricSets)
default:
// MySQL and MongoDB collectors not yet implemented
}
}
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePostgreSQLDatabase,
Metrics: dbMetrics,
})

storageMetrics, err := mp.storagecollector.Collect(ctx)
if err != nil {
Expand All @@ -188,24 +222,6 @@ func (mp *metricspusher) Push(cred credential.Credential) error {
}
}

// PgBouncer stats — try-connect approach: silently skip when not running.
// The collector returns an error if PgBouncer is unreachable; we mark Up: false
// and still include the metric set so the server can track the pooler state.
pgbouncerMetrics, err := mp.pgbouncercollector.Collect(cred)
if err != nil {
pgbouncerMetrics = domainmetrics.PgBouncerMetrics{Up: false}
} else {
pgbouncerMetrics.Up = true
}
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePgBouncer,
Metrics: pgbouncerMetrics,
})

// If only the postgresql.database metric set exists (with up=false) and
// all other collectors failed, we still push so the server knows the agent
// is alive and PostgreSQL status is reported.

hostname, _ := os.Hostname()

payload := domainmetrics.MetricPayload{
Expand All @@ -220,3 +236,35 @@ func (mp *metricspusher) Push(cred credential.Credential) error {

return mp.apiserver.PushMetrics(ctx, payload)
}

func (mp *metricspusher) collectDockerPGMetrics(ctx context.Context, d dockerdiscovery.DiscoveredDatabase, metricSets *[]domainmetrics.MetricSet) {
dockerCred := credential.Credential{
Host: d.Host,
Port: int(d.Port),
Username: d.Username,
Database: d.Database,
Dialect: "postgresql",
}
if d.Password != "" {
dockerCred.Password = &d.Password
}

dbMetrics, err := mp.metricscollector.Collect(dockerCred)
if err != nil {
log.Warnf("docker PG metrics collection failed for %s: %v", d.ContainerName, err)
dbMetrics = domainmetrics.PostgreSQLDatabaseMetrics{Up: false}
} else {
dbMetrics.Up = true
}
*metricSets = append(*metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePostgreSQLDatabase,
Attributes: map[string]any{
"container_id": d.ContainerID[:12],
"container_name": d.ContainerName,
"database_name": d.Database,
"port": d.Port,
"source": "docker",
},
Metrics: dbMetrics,
})
}
Loading
Loading