diff --git a/cmd/metrics/statsd.go b/cmd/metrics/statsd.go new file mode 100644 index 0000000..9c8420c --- /dev/null +++ b/cmd/metrics/statsd.go @@ -0,0 +1,125 @@ +package metrics + +import ( + "fmt" + "net" + "strings" + "time" +) + +const ( + defaultStatsdPort = 8125 + defaultStatsdPrefix = "launcher." +) + +// statsdMetrics emits DogStatsD-format metrics over UDP. Dimensions live in +// tags rather than the metric name, so the same emit code talks to the +// Datadog agent, statsd_exporter, the OTel collector, Telegraf, Vector, or a +// hand-rolled receiver — receiver-side mapping decides how each dimension is +// rolled up or labelled. +// +// Off by default: a constructor call with an empty host returns a no-op +// implementation that never opens a socket. +type statsdMetrics struct { + addr string + prefix string + + launcher string + repo string + pkg string + cmd string + subcmd string + partition uint8 + startTimestamp time.Time +} + +// NewStatsdMetricsCollector returns a Metrics implementation that emits +// DogStatsD-format packets to host:port. host == "" disables the exporter +// entirely (Send becomes a no-op, no socket is opened). prefix is prepended +// to every metric name; defaults to "launcher." when empty. +func NewStatsdMetricsCollector(launcher, host string, port int, prefix string) Metrics { + if host == "" { + return &noopMetrics{} + } + if port == 0 { + port = defaultStatsdPort + } + if prefix == "" { + prefix = defaultStatsdPrefix + } + return &statsdMetrics{ + addr: fmt.Sprintf("%s:%d", host, port), + prefix: prefix, + launcher: launcher, + } +} + +func (m *statsdMetrics) Collect(uid uint8, repo, pkg, group, name string) error { + if group == "" { + return fmt.Errorf("unknown command") + } + m.repo = repo + m.pkg = pkg + m.cmd = group + m.subcmd = name + m.partition = uid + m.startTimestamp = time.Now() + return nil +} + +func (m *statsdMetrics) Send(cmdExitCode int, cmdError error) error { + status := "ok" + if cmdError != nil || cmdExitCode != 0 { + status = "ko" + } + durationMs := time.Since(m.startTimestamp).Milliseconds() + tags := m.formatTags(status) + + packets := []string{ + fmt.Sprintf("%sduration:%d|ms|#%s", m.prefix, durationMs, tags), + fmt.Sprintf("%scount:1|c|#%s", m.prefix, tags), + } + return sendUDP(m.addr, packets) +} + +func (m *statsdMetrics) formatTags(status string) string { + pairs := []string{ + "launcher:" + sanitizeTagValue(m.launcher), + "repo:" + sanitizeTagValue(m.repo), + "pkg:" + sanitizeTagValue(m.pkg), + "cmd:" + sanitizeTagValue(m.cmd), + "subcmd:" + sanitizeTagValue(m.subcmd), + fmt.Sprintf("partition:%d", m.partition), + "status:" + status, + } + return strings.Join(pairs, ",") +} + +// sanitizeTagValue strips characters that would break the DogStatsD wire +// format (commas, pipes, hashes, newlines) and replaces them with +// underscores. Tag keys are statically defined above so they don't need +// sanitising; values come from user-provided command/package names so they +// can in principle contain anything. +func sanitizeTagValue(v string) string { + r := strings.NewReplacer(",", "_", "|", "_", "#", "_", "\n", "_", " ", "_") + return r.Replace(v) +} + +func sendUDP(addr string, packets []string) error { + conn, err := net.Dial("udp", addr) + if err != nil { + return fmt.Errorf("statsd dial %s: %w", addr, err) + } + defer conn.Close() + for _, p := range packets { + if _, err := conn.Write([]byte(p)); err != nil { + return fmt.Errorf("statsd write: %w", err) + } + } + return nil +} + +type noopMetrics struct{} + +func (n *noopMetrics) Collect(uid uint8, repo, pkg, group, name string) error { return nil } +func (n *noopMetrics) Send(cmdExitCode int, cmdError error) error { return nil } diff --git a/cmd/metrics/statsd_test.go b/cmd/metrics/statsd_test.go new file mode 100644 index 0000000..8fda865 --- /dev/null +++ b/cmd/metrics/statsd_test.go @@ -0,0 +1,153 @@ +package metrics + +import ( + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStatsd_DisabledWhenHostEmpty(t *testing.T) { + m := NewStatsdMetricsCollector("cola", "", 0, "") + _, ok := m.(*noopMetrics) + assert.True(t, ok, "empty host should yield a no-op collector") + + // no-op methods don't panic and don't try to dial. + assert.NoError(t, m.Collect(3, "default", "pkg", "grp", "cmd")) + assert.NoError(t, m.Send(0, nil)) +} + +func TestStatsd_TagFormatting(t *testing.T) { + m := &statsdMetrics{launcher: "cola"} + m.repo = "default" + m.pkg = "hotfix" + m.cmd = "hotfix" + m.subcmd = "create" + m.partition = 3 + + tags := m.formatTags("ok") + + for _, want := range []string{ + "launcher:cola", + "repo:default", + "pkg:hotfix", + "cmd:hotfix", + "subcmd:create", + "partition:3", + "status:ok", + } { + assert.Contains(t, tags, want) + } +} + +func TestStatsd_SanitizeTagValue(t *testing.T) { + for _, tc := range []struct { + in, out string + }{ + {"normal", "normal"}, + {"with space", "with_space"}, + {"with,comma", "with_comma"}, + {"with|pipe", "with_pipe"}, + {"with#hash", "with_hash"}, + {"with\nnewline", "with_newline"}, + } { + assert.Equal(t, tc.out, sanitizeTagValue(tc.in), "input: %q", tc.in) + } +} + +// TestStatsd_EndToEnd_UDP verifies the on-wire packet shape against a real +// UDP listener — no parsing library, just byte-level matching. Sanity check +// that Send writes well-formed DogStatsD lines and that Collect → Send +// preserves all the dimensions through the tag map. +func TestStatsd_EndToEnd_UDP(t *testing.T) { + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + assert.NoError(t, err) + defer conn.Close() + + port := conn.LocalAddr().(*net.UDPAddr).Port + m := NewStatsdMetricsCollector("cola", "127.0.0.1", port, "test.") + + assert.NoError(t, m.Collect(7, "default", "hotfix", "hotfix", "create")) + assert.NoError(t, m.Send(0, nil)) + + packets := readPackets(t, conn, 2, 500*time.Millisecond) + assert.Len(t, packets, 2) + + hasDuration := false + hasCount := false + for _, p := range packets { + switch { + case strings.HasPrefix(p, "test.duration:") && strings.Contains(p, "|ms|#"): + hasDuration = true + assertCommonTags(t, p) + case strings.HasPrefix(p, "test.count:1|c|#"): + hasCount = true + assertCommonTags(t, p) + } + } + assert.True(t, hasDuration, "missing duration packet, got: %v", packets) + assert.True(t, hasCount, "missing count packet, got: %v", packets) +} + +func TestStatsd_StatusReflectsExitCode(t *testing.T) { + for _, tc := range []struct { + name string + exit int + err error + wantTag string + }{ + {"exit 0 + nil err", 0, nil, "status:ok"}, + {"exit 1", 1, nil, "status:ko"}, + {"non-nil err", 0, fmt.Errorf("boom"), "status:ko"}, + } { + t.Run(tc.name, func(t *testing.T) { + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + assert.NoError(t, err) + defer conn.Close() + + port := conn.LocalAddr().(*net.UDPAddr).Port + m := NewStatsdMetricsCollector("cola", "127.0.0.1", port, "") + + assert.NoError(t, m.Collect(0, "r", "p", "c", "s")) + assert.NoError(t, m.Send(tc.exit, tc.err)) + + packets := readPackets(t, conn, 2, 500*time.Millisecond) + for _, p := range packets { + assert.Contains(t, p, tc.wantTag) + } + }) + } +} + +func assertCommonTags(t *testing.T, packet string) { + t.Helper() + for _, want := range []string{ + "launcher:cola", + "repo:default", + "pkg:hotfix", + "cmd:hotfix", + "subcmd:create", + "partition:7", + "status:ok", + } { + assert.Contains(t, packet, want, "missing tag in packet: %s", packet) + } +} + +func readPackets(t *testing.T, conn *net.UDPConn, want int, deadline time.Duration) []string { + t.Helper() + conn.SetReadDeadline(time.Now().Add(deadline)) + out := []string{} + buf := make([]byte, 4096) + for len(out) < want { + n, _, err := conn.ReadFromUDP(buf) + if err != nil { + break + } + out = append(out, string(buf[:n])) + } + return out +} diff --git a/cmd/root.go b/cmd/root.go index 14f8fd0..09661ea 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -117,10 +117,16 @@ func preRun(cmd *cobra.Command, args []string) { } graphite := metrics.NewGraphiteMetricsCollector(viper.GetString(config.METRIC_GRAPHITE_HOST_KEY)) + statsd := metrics.NewStatsdMetricsCollector( + rootCtxt.appCtx.AppName(), + viper.GetString(config.METRIC_STATSD_HOST_KEY), + viper.GetInt(config.METRIC_STATSD_PORT_KEY), + viper.GetString(config.METRIC_STATSD_PREFIX_KEY), + ) extensible := metrics.NewExtensibleMetricsCollector( rootCtxt.backend.SystemCommand(repository.SYSTEM_METRICS_COMMAND), ) - rootCtxt.metrics = metrics.NewCompositeMetricsCollector(graphite, extensible) + rootCtxt.metrics = metrics.NewCompositeMetricsCollector(graphite, statsd, extensible) repo, pkg, group, name := cmdAndSubCmd(cmd) rootCtxt.metrics.Collect(rootCtxt.user.Partition, repo, pkg, group, name) } diff --git a/internal/config/load.go b/internal/config/load.go index eb25c9b..f84b842 100644 --- a/internal/config/load.go +++ b/internal/config/load.go @@ -91,6 +91,9 @@ func setDefaultConfig() { viper.SetDefault(USAGE_METRICS_ENABLED_KEY, false) viper.SetDefault(METRIC_GRAPHITE_HOST_KEY, "dummy") + viper.SetDefault(METRIC_STATSD_HOST_KEY, "") + viper.SetDefault(METRIC_STATSD_PORT_KEY, 8125) + viper.SetDefault(METRIC_STATSD_PREFIX_KEY, "launcher.") viper.SetDefault(INTERNAL_COMMAND_ENABLED_KEY, false) viper.SetDefault(EXPERIMENTAL_COMMAND_ENABLED_KEY, false) diff --git a/internal/config/settings.go b/internal/config/settings.go index 3055a6e..0687e8d 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "strconv" "strings" "time" @@ -21,6 +22,9 @@ const ( LOCAL_COMMAND_REPOSITORY_DIRNAME_KEY = "LOCAL_COMMAND_REPOSITORY_DIRNAME" USAGE_METRICS_ENABLED_KEY = "USAGE_METRICS_ENABLED" METRIC_GRAPHITE_HOST_KEY = "METRIC_GRAPHITE_HOST" + METRIC_STATSD_HOST_KEY = "METRIC_STATSD_HOST" // "" disables the StatsD exporter + METRIC_STATSD_PORT_KEY = "METRIC_STATSD_PORT" // defaults to 8125 + METRIC_STATSD_PREFIX_KEY = "METRIC_STATSD_PREFIX" // namespace prefix; defaults to "launcher." DEBUG_FLAGS_KEY = "DEBUG_FLAGS" DROPIN_FOLDER_KEY = "DROPIN_FOLDER" CI_ENABLED_KEY = "CI_ENABLED" @@ -69,6 +73,9 @@ func init() { LOCAL_COMMAND_REPOSITORY_DIRNAME_KEY, USAGE_METRICS_ENABLED_KEY, METRIC_GRAPHITE_HOST_KEY, + METRIC_STATSD_HOST_KEY, + METRIC_STATSD_PORT_KEY, + METRIC_STATSD_PREFIX_KEY, DEBUG_FLAGS_KEY, DROPIN_FOLDER_KEY, CI_ENABLED_KEY, @@ -111,6 +118,12 @@ func SetSettingValue(key string, value string) error { return setBooleanConfig(upperKey, value) case METRIC_GRAPHITE_HOST_KEY: return setStringConfig(upperKey, value) + case METRIC_STATSD_HOST_KEY: + return setStringConfig(upperKey, value) + case METRIC_STATSD_PORT_KEY: + return setIntConfig(upperKey, value) + case METRIC_STATSD_PREFIX_KEY: + return setStringConfig(upperKey, value) case DEBUG_FLAGS_KEY: return setStringConfig(upperKey, value) case DROPIN_FOLDER_KEY: @@ -265,6 +278,15 @@ func setStringConfig(key string, value string) error { return nil } +func setIntConfig(key string, value string) error { + n, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid format for int type: %s", value) + } + viper.Set(key, n) + return nil +} + func setLogLevelConfig(value string) error { _, err := log.ParseLevel(strings.ToLower(value)) if err == nil {