Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions cmd/metrics/statsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package metrics

import (
"fmt"
"net"
"strings"
"time"
)

const (
defaultStatsdPort = 8125
defaultStatsdPrefix = "launcher."
)

// statsdMetrics emits DogStatsD-format metrics over UDP. Dimensions live in
// tags rather than the metric name, so the same emit code talks to the
// Datadog agent, statsd_exporter, the OTel collector, Telegraf, Vector, or a
// hand-rolled receiver — receiver-side mapping decides how each dimension is
// rolled up or labelled.
//
// Off by default: a constructor call with an empty host returns a no-op
// implementation that never opens a socket.
type statsdMetrics struct {
addr string
prefix string

launcher string
repo string
pkg string
cmd string
subcmd string
partition uint8
startTimestamp time.Time
}

// NewStatsdMetricsCollector returns a Metrics implementation that emits
// DogStatsD-format packets to host:port. host == "" disables the exporter
// entirely (Send becomes a no-op, no socket is opened). prefix is prepended
// to every metric name; defaults to "launcher." when empty.
func NewStatsdMetricsCollector(launcher, host string, port int, prefix string) Metrics {
if host == "" {
return &noopMetrics{}
}
if port == 0 {
port = defaultStatsdPort
}
if prefix == "" {
prefix = defaultStatsdPrefix
}
return &statsdMetrics{
addr: fmt.Sprintf("%s:%d", host, port),
prefix: prefix,
launcher: launcher,
}
}

func (m *statsdMetrics) Collect(uid uint8, repo, pkg, group, name string) error {
if group == "" {
return fmt.Errorf("unknown command")
}
m.repo = repo
m.pkg = pkg
m.cmd = group
m.subcmd = name
m.partition = uid
m.startTimestamp = time.Now()
return nil
}

func (m *statsdMetrics) Send(cmdExitCode int, cmdError error) error {
status := "ok"
if cmdError != nil || cmdExitCode != 0 {
status = "ko"
}
durationMs := time.Since(m.startTimestamp).Milliseconds()
tags := m.formatTags(status)

packets := []string{
fmt.Sprintf("%sduration:%d|ms|#%s", m.prefix, durationMs, tags),
fmt.Sprintf("%scount:1|c|#%s", m.prefix, tags),
}
return sendUDP(m.addr, packets)
}

func (m *statsdMetrics) formatTags(status string) string {
pairs := []string{
"launcher:" + sanitizeTagValue(m.launcher),
"repo:" + sanitizeTagValue(m.repo),
"pkg:" + sanitizeTagValue(m.pkg),
"cmd:" + sanitizeTagValue(m.cmd),
"subcmd:" + sanitizeTagValue(m.subcmd),
fmt.Sprintf("partition:%d", m.partition),
"status:" + status,
}
return strings.Join(pairs, ",")
}

// sanitizeTagValue strips characters that would break the DogStatsD wire
// format (commas, pipes, hashes, newlines) and replaces them with
// underscores. Tag keys are statically defined above so they don't need
// sanitising; values come from user-provided command/package names so they
// can in principle contain anything.
func sanitizeTagValue(v string) string {
r := strings.NewReplacer(",", "_", "|", "_", "#", "_", "\n", "_", " ", "_")
return r.Replace(v)
}

func sendUDP(addr string, packets []string) error {
conn, err := net.Dial("udp", addr)
if err != nil {
return fmt.Errorf("statsd dial %s: %w", addr, err)
}
defer conn.Close()
for _, p := range packets {
if _, err := conn.Write([]byte(p)); err != nil {
return fmt.Errorf("statsd write: %w", err)
}
}
return nil
}

type noopMetrics struct{}

func (n *noopMetrics) Collect(uid uint8, repo, pkg, group, name string) error { return nil }
func (n *noopMetrics) Send(cmdExitCode int, cmdError error) error { return nil }
153 changes: 153 additions & 0 deletions cmd/metrics/statsd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package metrics

import (
"fmt"
"net"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestStatsd_DisabledWhenHostEmpty(t *testing.T) {
m := NewStatsdMetricsCollector("cola", "", 0, "")
_, ok := m.(*noopMetrics)
assert.True(t, ok, "empty host should yield a no-op collector")

// no-op methods don't panic and don't try to dial.
assert.NoError(t, m.Collect(3, "default", "pkg", "grp", "cmd"))
assert.NoError(t, m.Send(0, nil))
}

func TestStatsd_TagFormatting(t *testing.T) {
m := &statsdMetrics{launcher: "cola"}
m.repo = "default"
m.pkg = "hotfix"
m.cmd = "hotfix"
m.subcmd = "create"
m.partition = 3

tags := m.formatTags("ok")

for _, want := range []string{
"launcher:cola",
"repo:default",
"pkg:hotfix",
"cmd:hotfix",
"subcmd:create",
"partition:3",
"status:ok",
} {
assert.Contains(t, tags, want)
}
}

func TestStatsd_SanitizeTagValue(t *testing.T) {
for _, tc := range []struct {
in, out string
}{
{"normal", "normal"},
{"with space", "with_space"},
{"with,comma", "with_comma"},
{"with|pipe", "with_pipe"},
{"with#hash", "with_hash"},
{"with\nnewline", "with_newline"},
} {
assert.Equal(t, tc.out, sanitizeTagValue(tc.in), "input: %q", tc.in)
}
}

// TestStatsd_EndToEnd_UDP verifies the on-wire packet shape against a real
// UDP listener — no parsing library, just byte-level matching. Sanity check
// that Send writes well-formed DogStatsD lines and that Collect → Send
// preserves all the dimensions through the tag map.
func TestStatsd_EndToEnd_UDP(t *testing.T) {
conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
assert.NoError(t, err)
defer conn.Close()

port := conn.LocalAddr().(*net.UDPAddr).Port
m := NewStatsdMetricsCollector("cola", "127.0.0.1", port, "test.")

assert.NoError(t, m.Collect(7, "default", "hotfix", "hotfix", "create"))
assert.NoError(t, m.Send(0, nil))

packets := readPackets(t, conn, 2, 500*time.Millisecond)
assert.Len(t, packets, 2)

hasDuration := false
hasCount := false
for _, p := range packets {
switch {
case strings.HasPrefix(p, "test.duration:") && strings.Contains(p, "|ms|#"):
hasDuration = true
assertCommonTags(t, p)
case strings.HasPrefix(p, "test.count:1|c|#"):
hasCount = true
assertCommonTags(t, p)
}
}
assert.True(t, hasDuration, "missing duration packet, got: %v", packets)
assert.True(t, hasCount, "missing count packet, got: %v", packets)
}

func TestStatsd_StatusReflectsExitCode(t *testing.T) {
for _, tc := range []struct {
name string
exit int
err error
wantTag string
}{
{"exit 0 + nil err", 0, nil, "status:ok"},
{"exit 1", 1, nil, "status:ko"},
{"non-nil err", 0, fmt.Errorf("boom"), "status:ko"},
} {
t.Run(tc.name, func(t *testing.T) {
conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
assert.NoError(t, err)
defer conn.Close()

port := conn.LocalAddr().(*net.UDPAddr).Port
m := NewStatsdMetricsCollector("cola", "127.0.0.1", port, "")

assert.NoError(t, m.Collect(0, "r", "p", "c", "s"))
assert.NoError(t, m.Send(tc.exit, tc.err))

packets := readPackets(t, conn, 2, 500*time.Millisecond)
for _, p := range packets {
assert.Contains(t, p, tc.wantTag)
}
})
}
}

func assertCommonTags(t *testing.T, packet string) {
t.Helper()
for _, want := range []string{
"launcher:cola",
"repo:default",
"pkg:hotfix",
"cmd:hotfix",
"subcmd:create",
"partition:7",
"status:ok",
} {
assert.Contains(t, packet, want, "missing tag in packet: %s", packet)
}
}

func readPackets(t *testing.T, conn *net.UDPConn, want int, deadline time.Duration) []string {
t.Helper()
conn.SetReadDeadline(time.Now().Add(deadline))
out := []string{}
buf := make([]byte, 4096)
for len(out) < want {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
break
}
out = append(out, string(buf[:n]))
}
return out
}
8 changes: 7 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,16 @@ func preRun(cmd *cobra.Command, args []string) {
}

graphite := metrics.NewGraphiteMetricsCollector(viper.GetString(config.METRIC_GRAPHITE_HOST_KEY))
statsd := metrics.NewStatsdMetricsCollector(
rootCtxt.appCtx.AppName(),
viper.GetString(config.METRIC_STATSD_HOST_KEY),
viper.GetInt(config.METRIC_STATSD_PORT_KEY),
viper.GetString(config.METRIC_STATSD_PREFIX_KEY),
)
extensible := metrics.NewExtensibleMetricsCollector(
rootCtxt.backend.SystemCommand(repository.SYSTEM_METRICS_COMMAND),
)
rootCtxt.metrics = metrics.NewCompositeMetricsCollector(graphite, extensible)
rootCtxt.metrics = metrics.NewCompositeMetricsCollector(graphite, statsd, extensible)
repo, pkg, group, name := cmdAndSubCmd(cmd)
rootCtxt.metrics.Collect(rootCtxt.user.Partition, repo, pkg, group, name)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func setDefaultConfig() {

viper.SetDefault(USAGE_METRICS_ENABLED_KEY, false)
viper.SetDefault(METRIC_GRAPHITE_HOST_KEY, "dummy")
viper.SetDefault(METRIC_STATSD_HOST_KEY, "")
viper.SetDefault(METRIC_STATSD_PORT_KEY, 8125)
viper.SetDefault(METRIC_STATSD_PREFIX_KEY, "launcher.")

viper.SetDefault(INTERNAL_COMMAND_ENABLED_KEY, false)
viper.SetDefault(EXPERIMENTAL_COMMAND_ENABLED_KEY, false)
Expand Down
22 changes: 22 additions & 0 deletions internal/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -21,6 +22,9 @@ const (
LOCAL_COMMAND_REPOSITORY_DIRNAME_KEY = "LOCAL_COMMAND_REPOSITORY_DIRNAME"
USAGE_METRICS_ENABLED_KEY = "USAGE_METRICS_ENABLED"
METRIC_GRAPHITE_HOST_KEY = "METRIC_GRAPHITE_HOST"
METRIC_STATSD_HOST_KEY = "METRIC_STATSD_HOST" // "" disables the StatsD exporter
METRIC_STATSD_PORT_KEY = "METRIC_STATSD_PORT" // defaults to 8125
METRIC_STATSD_PREFIX_KEY = "METRIC_STATSD_PREFIX" // namespace prefix; defaults to "launcher."
DEBUG_FLAGS_KEY = "DEBUG_FLAGS"
DROPIN_FOLDER_KEY = "DROPIN_FOLDER"
CI_ENABLED_KEY = "CI_ENABLED"
Expand Down Expand Up @@ -69,6 +73,9 @@ func init() {
LOCAL_COMMAND_REPOSITORY_DIRNAME_KEY,
USAGE_METRICS_ENABLED_KEY,
METRIC_GRAPHITE_HOST_KEY,
METRIC_STATSD_HOST_KEY,
METRIC_STATSD_PORT_KEY,
METRIC_STATSD_PREFIX_KEY,
DEBUG_FLAGS_KEY,
DROPIN_FOLDER_KEY,
CI_ENABLED_KEY,
Expand Down Expand Up @@ -111,6 +118,12 @@ func SetSettingValue(key string, value string) error {
return setBooleanConfig(upperKey, value)
case METRIC_GRAPHITE_HOST_KEY:
return setStringConfig(upperKey, value)
case METRIC_STATSD_HOST_KEY:
return setStringConfig(upperKey, value)
case METRIC_STATSD_PORT_KEY:
return setIntConfig(upperKey, value)
case METRIC_STATSD_PREFIX_KEY:
return setStringConfig(upperKey, value)
case DEBUG_FLAGS_KEY:
return setStringConfig(upperKey, value)
case DROPIN_FOLDER_KEY:
Expand Down Expand Up @@ -265,6 +278,15 @@ func setStringConfig(key string, value string) error {
return nil
}

func setIntConfig(key string, value string) error {
n, err := strconv.Atoi(value)
if err != nil {
return fmt.Errorf("invalid format for int type: %s", value)
}
viper.Set(key, n)
return nil
}

func setLogLevelConfig(value string) error {
_, err := log.ParseLevel(strings.ToLower(value))
if err == nil {
Expand Down
Loading