From fc793fed09f4c6b557b859251a75cf9835a678f3 Mon Sep 17 00:00:00 2001 From: Jacobo de Vera Date: Tue, 19 May 2026 23:53:22 +0100 Subject: [PATCH 1/4] Add DogStatsD metrics exporter (not yet wired) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the Metrics interface alongside graphiteMetrics and extensibleMetrics. Emits two packets per command invocation over UDP: duration:|ms|#launcher=,repo=,pkg=,cmd=,subcmd=,partition=,status= count:1|c|# Dimensions live in DogStatsD tags rather than the metric name, so the same emit code reaches the Datadog agent, prometheus statsd_exporter, the OTel collector, Telegraf, Vector, or a custom receiver — the receiver decides how to roll up. Off by default: an empty host returns a no-op collector that never opens a socket. Mirrors how Graphite is wired in today. No external dep. Single net.Dial("udp", ...) per Send; UDP fire-and-forget so the CLI doesn't block on a downed receiver. Tests cover format generation, tag sanitisation, end-to-end UDP loopback, and the exit-code → status mapping. Wired into the composite collector in a follow-up commit so this lands as a pure addition. --- cmd/metrics/statsd.go | 125 ++++++++++++++++++++++++++++++ cmd/metrics/statsd_test.go | 153 +++++++++++++++++++++++++++++++++++++ 2 files changed, 278 insertions(+) create mode 100644 cmd/metrics/statsd.go create mode 100644 cmd/metrics/statsd_test.go diff --git a/cmd/metrics/statsd.go b/cmd/metrics/statsd.go new file mode 100644 index 0000000..9843179 --- /dev/null +++ b/cmd/metrics/statsd.go @@ -0,0 +1,125 @@ +package metrics + +import ( + "fmt" + "net" + "strings" + "time" +) + +const ( + defaultStatsdPort = 8125 + defaultStatsdPrefix = "launcher." +) + +// statsdMetrics emits DogStatsD-format metrics over UDP. Dimensions live in +// tags rather than the metric name, so the same emit code talks to the +// Datadog agent, statsd_exporter, the OTel collector, Telegraf, Vector, or a +// hand-rolled receiver — receiver-side mapping decides how each dimension is +// rolled up or labelled. +// +// Off by default: a constructor call with an empty host returns a no-op +// implementation that never opens a socket. +type statsdMetrics struct { + addr string + prefix string + + launcher string + repo string + pkg string + cmd string + subcmd string + partition uint8 + startTimestamp time.Time +} + +// NewStatsdMetricsCollector returns a Metrics implementation that emits +// DogStatsD-format packets to host:port. host == "" disables the exporter +// entirely (Send becomes a no-op, no socket is opened). prefix is prepended +// to every metric name; defaults to "launcher." when empty. +func NewStatsdMetricsCollector(launcher, host string, port int, prefix string) Metrics { + if host == "" { + return &noopMetrics{} + } + if port == 0 { + port = defaultStatsdPort + } + if prefix == "" { + prefix = defaultStatsdPrefix + } + return &statsdMetrics{ + addr: fmt.Sprintf("%s:%d", host, port), + prefix: prefix, + launcher: launcher, + } +} + +func (m *statsdMetrics) Collect(uid uint8, repo, pkg, group, name string) error { + if group == "" { + return fmt.Errorf("unknown command") + } + m.repo = repo + m.pkg = pkg + m.cmd = group + m.subcmd = name + m.partition = uid + m.startTimestamp = time.Now() + return nil +} + +func (m *statsdMetrics) Send(cmdExitCode int, cmdError error) error { + status := "ok" + if cmdError != nil || cmdExitCode != 0 { + status = "ko" + } + durationMs := time.Since(m.startTimestamp).Milliseconds() + tags := m.formatTags(status) + + packets := []string{ + fmt.Sprintf("%sduration:%d|ms|#%s", m.prefix, durationMs, tags), + fmt.Sprintf("%scount:1|c|#%s", m.prefix, tags), + } + return sendUDP(m.addr, packets) +} + +func (m *statsdMetrics) formatTags(status string) string { + pairs := []string{ + "launcher:" + sanitizeTagValue(m.launcher), + "repo:" + sanitizeTagValue(m.repo), + "pkg:" + sanitizeTagValue(m.pkg), + "cmd:" + sanitizeTagValue(m.cmd), + "subcmd:" + sanitizeTagValue(m.subcmd), + fmt.Sprintf("partition:%d", m.partition), + "status:" + status, + } + return strings.Join(pairs, ",") +} + +// sanitizeTagValue strips characters that would break the DogStatsD wire +// format (commas, pipes, hashes, newlines) and replaces them with +// underscores. Tag keys are statically defined above so they don't need +// sanitising; values come from user-provided command/package names so they +// can in principle contain anything. +func sanitizeTagValue(v string) string { + r := strings.NewReplacer(",", "_", "|", "_", "#", "_", "\n", "_", " ", "_") + return r.Replace(v) +} + +func sendUDP(addr string, packets []string) error { + conn, err := net.Dial("udp", addr) + if err != nil { + return fmt.Errorf("statsd dial %s: %w", addr, err) + } + defer conn.Close() + for _, p := range packets { + if _, err := conn.Write([]byte(p)); err != nil { + return fmt.Errorf("statsd write: %w", err) + } + } + return nil +} + +type noopMetrics struct{} + +func (n *noopMetrics) Collect(uid uint8, repo, pkg, group, name string) error { return nil } +func (n *noopMetrics) Send(cmdExitCode int, cmdError error) error { return nil } diff --git a/cmd/metrics/statsd_test.go b/cmd/metrics/statsd_test.go new file mode 100644 index 0000000..1d60d75 --- /dev/null +++ b/cmd/metrics/statsd_test.go @@ -0,0 +1,153 @@ +package metrics + +import ( + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStatsd_DisabledWhenHostEmpty(t *testing.T) { + m := NewStatsdMetricsCollector("cola", "", 0, "") + _, ok := m.(*noopMetrics) + assert.True(t, ok, "empty host should yield a no-op collector") + + // no-op methods don't panic and don't try to dial. + assert.NoError(t, m.Collect(3, "default", "pkg", "grp", "cmd")) + assert.NoError(t, m.Send(0, nil)) +} + +func TestStatsd_TagFormatting(t *testing.T) { + m := &statsdMetrics{launcher: "cola"} + m.repo = "default" + m.pkg = "hotfix" + m.cmd = "hotfix" + m.subcmd = "create" + m.partition = 3 + + tags := m.formatTags("ok") + + for _, want := range []string{ + "launcher:cola", + "repo:default", + "pkg:hotfix", + "cmd:hotfix", + "subcmd:create", + "partition:3", + "status:ok", + } { + assert.Contains(t, tags, want) + } +} + +func TestStatsd_SanitizeTagValue(t *testing.T) { + for _, tc := range []struct { + in, out string + }{ + {"normal", "normal"}, + {"with space", "with_space"}, + {"with,comma", "with_comma"}, + {"with|pipe", "with_pipe"}, + {"with#hash", "with_hash"}, + {"with\nnewline", "with_newline"}, + } { + assert.Equal(t, tc.out, sanitizeTagValue(tc.in), "input: %q", tc.in) + } +} + +// TestStatsd_EndToEnd_UDP verifies the on-wire packet shape against a real +// UDP listener — no parsing library, just byte-level matching. Sanity check +// that Send writes well-formed DogStatsD lines and that Collect → Send +// preserves all the dimensions through the tag map. +func TestStatsd_EndToEnd_UDP(t *testing.T) { + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + assert.NoError(t, err) + defer conn.Close() + + port := conn.LocalAddr().(*net.UDPAddr).Port + m := NewStatsdMetricsCollector("cola", "127.0.0.1", port, "test.") + + assert.NoError(t, m.Collect(7, "default", "hotfix", "hotfix", "create")) + assert.NoError(t, m.Send(0, nil)) + + packets := readPackets(t, conn, 2, 500*time.Millisecond) + assert.Len(t, packets, 2) + + hasDuration := false + hasCount := false + for _, p := range packets { + switch { + case strings.HasPrefix(p, "test.duration:") && strings.Contains(p, "|ms|#"): + hasDuration = true + assertCommonTags(t, p) + case strings.HasPrefix(p, "test.count:1|c|#"): + hasCount = true + assertCommonTags(t, p) + } + } + assert.True(t, hasDuration, "missing duration packet, got: %v", packets) + assert.True(t, hasCount, "missing count packet, got: %v", packets) +} + +func TestStatsd_StatusReflectsExitCode(t *testing.T) { + for _, tc := range []struct { + name string + exit int + err error + wantTag string + }{ + {"exit 0 + nil err", 0, nil, "status:ok"}, + {"exit 1", 1, nil, "status:ko"}, + {"non-nil err", 0, fmt.Errorf("boom"), "status:ko"}, + } { + t.Run(tc.name, func(t *testing.T) { + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + assert.NoError(t, err) + defer conn.Close() + + port := conn.LocalAddr().(*net.UDPAddr).Port + m := NewStatsdMetricsCollector("cola", "127.0.0.1", port, "") + + assert.NoError(t, m.Collect(0, "r", "p", "c", "s")) + assert.NoError(t, m.Send(tc.exit, tc.err)) + + packets := readPackets(t, conn, 2, 500*time.Millisecond) + for _, p := range packets { + assert.Contains(t, p, tc.wantTag) + } + }) + } +} + +func assertCommonTags(t *testing.T, packet string) { + t.Helper() + for _, want := range []string{ + "launcher:cola", + "repo:default", + "pkg:hotfix", + "cmd:hotfix", + "subcmd:create", + "partition:7", + "status:ok", + } { + assert.Contains(t, packet, want, "missing tag in packet: %s", packet) + } +} + +func readPackets(t *testing.T, conn *net.UDPConn, want int, deadline time.Duration) []string { + t.Helper() + conn.SetReadDeadline(time.Now().Add(deadline)) + out := []string{} + buf := make([]byte, 4096) + for len(out) < want { + n, _, err := conn.ReadFromUDP(buf) + if err != nil { + break + } + out = append(out, string(buf[:n])) + } + return out +} From a7fdb9320096c55dcf1e6dce45daca5d32324021 Mon Sep 17 00:00:00 2001 From: Jacobo de Vera Date: Tue, 19 May 2026 23:55:08 +0100 Subject: [PATCH 2/4] Add config keys for the StatsD exporter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new keys, all opt-in: METRIC_STATSD_HOST "" — set to enable; "" leaves the exporter disabled METRIC_STATSD_PORT 8125 — de-facto StatsD/DogStatsD port METRIC_STATSD_PREFIX "launcher." — namespace prefix for emitted metric names Adds a small setIntConfig helper for the port; uses existing string/duration helpers for the rest. Same disabled-by-default posture as Graphite — both exporters can be set independently and either, both, or neither will emit. --- internal/config/load.go | 3 +++ internal/config/settings.go | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/internal/config/load.go b/internal/config/load.go index eb25c9b..f84b842 100644 --- a/internal/config/load.go +++ b/internal/config/load.go @@ -91,6 +91,9 @@ func setDefaultConfig() { viper.SetDefault(USAGE_METRICS_ENABLED_KEY, false) viper.SetDefault(METRIC_GRAPHITE_HOST_KEY, "dummy") + viper.SetDefault(METRIC_STATSD_HOST_KEY, "") + viper.SetDefault(METRIC_STATSD_PORT_KEY, 8125) + viper.SetDefault(METRIC_STATSD_PREFIX_KEY, "launcher.") viper.SetDefault(INTERNAL_COMMAND_ENABLED_KEY, false) viper.SetDefault(EXPERIMENTAL_COMMAND_ENABLED_KEY, false) diff --git a/internal/config/settings.go b/internal/config/settings.go index 3055a6e..0687e8d 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "strconv" "strings" "time" @@ -21,6 +22,9 @@ const ( LOCAL_COMMAND_REPOSITORY_DIRNAME_KEY = "LOCAL_COMMAND_REPOSITORY_DIRNAME" USAGE_METRICS_ENABLED_KEY = "USAGE_METRICS_ENABLED" METRIC_GRAPHITE_HOST_KEY = "METRIC_GRAPHITE_HOST" + METRIC_STATSD_HOST_KEY = "METRIC_STATSD_HOST" // "" disables the StatsD exporter + METRIC_STATSD_PORT_KEY = "METRIC_STATSD_PORT" // defaults to 8125 + METRIC_STATSD_PREFIX_KEY = "METRIC_STATSD_PREFIX" // namespace prefix; defaults to "launcher." DEBUG_FLAGS_KEY = "DEBUG_FLAGS" DROPIN_FOLDER_KEY = "DROPIN_FOLDER" CI_ENABLED_KEY = "CI_ENABLED" @@ -69,6 +73,9 @@ func init() { LOCAL_COMMAND_REPOSITORY_DIRNAME_KEY, USAGE_METRICS_ENABLED_KEY, METRIC_GRAPHITE_HOST_KEY, + METRIC_STATSD_HOST_KEY, + METRIC_STATSD_PORT_KEY, + METRIC_STATSD_PREFIX_KEY, DEBUG_FLAGS_KEY, DROPIN_FOLDER_KEY, CI_ENABLED_KEY, @@ -111,6 +118,12 @@ func SetSettingValue(key string, value string) error { return setBooleanConfig(upperKey, value) case METRIC_GRAPHITE_HOST_KEY: return setStringConfig(upperKey, value) + case METRIC_STATSD_HOST_KEY: + return setStringConfig(upperKey, value) + case METRIC_STATSD_PORT_KEY: + return setIntConfig(upperKey, value) + case METRIC_STATSD_PREFIX_KEY: + return setStringConfig(upperKey, value) case DEBUG_FLAGS_KEY: return setStringConfig(upperKey, value) case DROPIN_FOLDER_KEY: @@ -265,6 +278,15 @@ func setStringConfig(key string, value string) error { return nil } +func setIntConfig(key string, value string) error { + n, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid format for int type: %s", value) + } + viper.Set(key, n) + return nil +} + func setLogLevelConfig(value string) error { _, err := log.ParseLevel(strings.ToLower(value)) if err == nil { From 1bdcfde6ef247d041acfd11a0e5367820f3dac19 Mon Sep 17 00:00:00 2001 From: Jacobo de Vera Date: Tue, 19 May 2026 23:56:19 +0100 Subject: [PATCH 3/4] Wire StatsD exporter into the metrics composite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the StatsD collector alongside Graphite + extensible in the composite. Passes the launcher's runtime app name as the launcher tag so the exporter doesn't need to reach into the context itself. No behaviour change at default config: METRIC_STATSD_HOST is empty by default, so the constructor returns a no-op. Setting the host activates the exporter without touching Graphite — both run side by side, emitting to whatever receivers you point them at. --- cmd/root.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/root.go b/cmd/root.go index 14f8fd0..09661ea 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -117,10 +117,16 @@ func preRun(cmd *cobra.Command, args []string) { } graphite := metrics.NewGraphiteMetricsCollector(viper.GetString(config.METRIC_GRAPHITE_HOST_KEY)) + statsd := metrics.NewStatsdMetricsCollector( + rootCtxt.appCtx.AppName(), + viper.GetString(config.METRIC_STATSD_HOST_KEY), + viper.GetInt(config.METRIC_STATSD_PORT_KEY), + viper.GetString(config.METRIC_STATSD_PREFIX_KEY), + ) extensible := metrics.NewExtensibleMetricsCollector( rootCtxt.backend.SystemCommand(repository.SYSTEM_METRICS_COMMAND), ) - rootCtxt.metrics = metrics.NewCompositeMetricsCollector(graphite, extensible) + rootCtxt.metrics = metrics.NewCompositeMetricsCollector(graphite, statsd, extensible) repo, pkg, group, name := cmdAndSubCmd(cmd) rootCtxt.metrics.Collect(rootCtxt.user.Partition, repo, pkg, group, name) } From 8d926707052305e342f5aab0829103aee536987d Mon Sep 17 00:00:00 2001 From: Jacobo de Vera Date: Fri, 22 May 2026 00:10:58 +0100 Subject: [PATCH 4/4] gofmt cleanup in the statsd files Tightens whitespace in two places gofmt flagged after the initial commits: a double-space in the noopMetrics.Send signature, and struct-field column alignment in TestStatsd_StatusReflectsExitCode. No behaviour change. --- cmd/metrics/statsd.go | 2 +- cmd/metrics/statsd_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/metrics/statsd.go b/cmd/metrics/statsd.go index 9843179..9c8420c 100644 --- a/cmd/metrics/statsd.go +++ b/cmd/metrics/statsd.go @@ -122,4 +122,4 @@ func sendUDP(addr string, packets []string) error { type noopMetrics struct{} func (n *noopMetrics) Collect(uid uint8, repo, pkg, group, name string) error { return nil } -func (n *noopMetrics) Send(cmdExitCode int, cmdError error) error { return nil } +func (n *noopMetrics) Send(cmdExitCode int, cmdError error) error { return nil } diff --git a/cmd/metrics/statsd_test.go b/cmd/metrics/statsd_test.go index 1d60d75..8fda865 100644 --- a/cmd/metrics/statsd_test.go +++ b/cmd/metrics/statsd_test.go @@ -94,10 +94,10 @@ func TestStatsd_EndToEnd_UDP(t *testing.T) { func TestStatsd_StatusReflectsExitCode(t *testing.T) { for _, tc := range []struct { - name string - exit int - err error - wantTag string + name string + exit int + err error + wantTag string }{ {"exit 0 + nil err", 0, nil, "status:ok"}, {"exit 1", 1, nil, "status:ko"},