diff --git a/examples/telemetry/main.go b/examples/telemetry/main.go new file mode 100644 index 00000000..080afc53 --- /dev/null +++ b/examples/telemetry/main.go @@ -0,0 +1,112 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co +// SPDX-FileContributor: mochi-co + +package main + +import ( + "encoding/json" + "flag" + "log" + "math/rand" + "os" + "os/signal" + "syscall" + "time" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/hooks/auth" + prom "github.com/mochi-mqtt/server/v2/hooks/telemetry" + "github.com/mochi-mqtt/server/v2/listeners" + "gopkg.in/yaml.v3" +) + +// TagPayload represents the structure of the tag location data. +// This is just an example structure and can be modified as needed. +type TagPayload struct { + UUID string `json:"uuid"` + X float64 `json:"x"` + Y float64 `json:"y"` +} + +func main() { + metricsFile := flag.String("metrics", "metrics.yaml", "metrics configuration file") + flag.Parse() + + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigs + done <- true + }() + + // An example of configuring various server options... + options := &mqtt.Options{ + InlineClient: true, + } + + server := mqtt.New(options) + + // Exporter Hook + // Load metrics configuration from file + metricsBytes, err := os.ReadFile(*metricsFile) + if err != nil { + log.Fatal("failed to read metrics file:", err) + } + + var exporterOpts prom.Options + if err := yaml.Unmarshal(metricsBytes, &exporterOpts); err != nil { + log.Fatal("failed to parse metrics file:", err) + } + + _ = server.AddHook(new(prom.Hook), &exporterOpts) + + // For security reasons, the default implementation disallows all connections. + // If you want to allow all connections, you must specifically allow it. + err = server.AddHook(new(auth.AllowHook), nil) + if err != nil { + log.Fatal(err) + } + + tcp := listeners.NewTCP(listeners.Config{ + ID: "t1", + Address: ":1883", + }) + err = server.AddListener(tcp) + if err != nil { + log.Fatal(err) + } + + go func() { + err := server.Serve() + if err != nil { + log.Fatal(err) + } + }() + + // Simulate tag publishing location data every 5 seconds + go func() { + for range time.Tick(time.Second * 5) { + lat := 40.0 + rand.Float64()*(41.0-40.0) + lng := -74.0 + rand.Float64()*(-73.0+74.0) + + payload := TagPayload{ + UUID: "234e5678-e89b-12d3-a456-426614174000", + X: lng, + Y: lat, + } + data, _ := json.Marshal(payload) + err := server.Publish("tag/tunnel", data, false, 0) + if err != nil { + server.Log.Error("server.Publish", "error", err) + } + server.Log.Info("main.go issued direct message to tag/tunnel") + } + }() + + <-done + server.Log.Warn("caught signal, stopping...") + _ = server.Close() + server.Log.Info("main.go finished") +} diff --git a/examples/telemetry/metrics.yaml b/examples/telemetry/metrics.yaml new file mode 100644 index 00000000..9983fa59 --- /dev/null +++ b/examples/telemetry/metrics.yaml @@ -0,0 +1,10 @@ +port: 9091 +metrics: + - name: "location_x" + help: "X coordinate" + field: "x" + labels: ["uuid"] + - name: "location_y" + help: "Y coordinate" + field: "y" + labels: ["uuid"] \ No newline at end of file diff --git a/hooks/telemetry/prom.go b/hooks/telemetry/prom.go new file mode 100644 index 00000000..d947a824 --- /dev/null +++ b/hooks/telemetry/prom.go @@ -0,0 +1,155 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co +// SPDX-FileContributor: mochi-co + +package prom + +import ( + "bytes" + "encoding/json" + "log" + "log/slog" + "net/http" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/packets" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type MetricConfig struct { + Name string `yaml:"name" json:"name"` + Help string `yaml:"help" json:"help"` + Field string `yaml:"field" json:"field"` + Labels []string `yaml:"labels" json:"labels"` +} + +type Options struct { + Port string `yaml:"port" json:"port"` + Metrics []MetricConfig `yaml:"metrics" json:"metrics"` +} + +type Hook struct { + mqtt.HookBase + config *Options + server *http.Server + regist *prometheus.Registry + metric map[string]*prometheus.GaugeVec +} + +func (h *Hook) ID() string { + return "prometheus-exporter" +} + +func (h *Hook) Provides(b byte) bool { + return bytes.Contains([]byte{ + mqtt.OnPublished, + }, []byte{b}) +} + +func (h *Hook) Init(config any) error { + if _, ok := config.(*Options); !ok && config != nil { + return mqtt.ErrInvalidConfigType + } + + if config == nil { + config = new(Options) + } + + if h.Log == nil { + h.Log = slog.Default() + } + + h.config = config.(*Options) + h.regist = prometheus.NewRegistry() + h.metric = make(map[string]*prometheus.GaugeVec) + + // Create Prometheus Gauges for each metric + for _, m := range h.config.Metrics { + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: m.Name, + Help: m.Help, + }, + m.Labels, + ) + h.metric[m.Name] = gauge + if err := h.regist.Register(gauge); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + h.metric[m.Name] = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + return err + } + } + } + + // Prometheus HTTP Server + addr := h.config.Port + if addr != "" && addr[0] != ':' { + addr = ":" + addr + } else { + addr = ":9090" + } + + h.server = &http.Server{Addr: addr} + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(h.regist, promhttp.HandlerOpts{})) + h.server.Handler = mux + + go func() { + log.Printf("Prometheus metrics server starting on %s/metrics", addr) + if err := h.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Printf("Error starting Prometheus HTTP server: %v", err) + } + }() + + return nil +} + +func (h *Hook) Stop() error { + if h.server != nil { + h.Log.Info("Stopping Prometheus metrics server") + return h.server.Close() + } + return nil +} + +func (h *Hook) OnPublished(cl *mqtt.Client, pk packets.Packet) { + var payload map[string]any + if err := json.Unmarshal(pk.Payload, &payload); err != nil { + h.Log.Info("failed to unmarshal payload", "error", err) + return + } + + for _, m := range h.config.Metrics { + metric, ok := h.metric[m.Name] + if !ok { + continue + } + + value, ok := payload[m.Field].(float64) + if !ok { + h.Log.Info("field not found or not a number", "field", m.Field) + continue + } + + labels := make(prometheus.Labels) + allLabelsPresent := true + for _, label := range m.Labels { + if labelValue, ok := payload[label].(string); ok { + labels[label] = labelValue + } else { + h.Log.Info("label not found or not a string", "label", label) + allLabelsPresent = false + break + } + } + + if !allLabelsPresent { + h.Log.Info("skipping metric update due to missing labels", "metric", m.Name) + continue + } + + metric.With(labels).Set(value) + } +} diff --git a/hooks/telemetry/prom_test.go b/hooks/telemetry/prom_test.go new file mode 100644 index 00000000..3f280a98 --- /dev/null +++ b/hooks/telemetry/prom_test.go @@ -0,0 +1,214 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co +// SPDX-FileContributor: mochi-co + +package prom + +import ( + "testing" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/packets" + "github.com/stretchr/testify/require" +) + +func TestBasicID(t *testing.T) { + h := new(Hook) + require.Equal(t, "exporter", h.ID()) +} + +func TestBasicProvides(t *testing.T) { + h := new(Hook) + require.True(t, h.Provides(mqtt.OnPublished)) + require.False(t, h.Provides(mqtt.OnConnectAuthenticate)) +} + +func TestBasicInitBadConfig(t *testing.T) { + h := new(Hook) + + err := h.Init(map[string]any{}) + require.Error(t, err) +} + +func TestBasicInitDefaultConfig(t *testing.T) { + h := new(Hook) + + err := h.Init(nil) + require.NoError(t, err) +} + +func TestBasicInitWithConfig(t *testing.T) { + h := new(Hook) + + config := &Options{ + Port: "9100", + Metrics: []MetricConfig{ + { + Name: "test_metric", + Help: "This is a test metric", + Field: "value", + }, + }, + } + + err := h.Init(config) + require.NoError(t, err) +} + +func TestOnPublishedSuccess(t *testing.T) { + h := new(Hook) + h.SetOpts(nil, nil) + + config := &Options{ + Port: ":9091", + Metrics: []MetricConfig{ + { + Name: "temperature", + Help: "sensor temperature", + Field: "temp", + Labels: []string{"sensor_id", "local"}, + }, + }, + } + + err := h.Init(config) + require.NoError(t, err) + defer h.Stop() + + client := &mqtt.Client{ + Properties: mqtt.ClientProperties{ + Username: []byte("mochi"), + }, + } + + packet := packets.Packet{ + FixedHeader: packets.FixedHeader{ + Type: packets.Publish, + }, + TopicName: "test/topic", + Payload: []byte(`{"sensor_id": "001", "local": "sala", "temp": 25.5}`), + } + + h.OnPublished(client, packet) + + metric := h.metric["temperature"] + require.NotNil(t, metric) +} + +func TestOnPublishedInvalidJSON(t *testing.T) { + h := new(Hook) + h.SetOpts(nil, nil) + + err := h.Init(&Options{Port: ":9092"}) + require.NoError(t, err) + defer h.Stop() + + client := &mqtt.Client{} + packet := packets.Packet{ + Payload: []byte(`{invalid json}`), + } + + h.OnPublished(client, packet) +} + +func TestOnPublishedMissingField(t *testing.T) { + h := new(Hook) + h.SetOpts(nil, nil) + + config := &Options{ + Port: ":9093", + Metrics: []MetricConfig{ + { + Name: "temperature", + Help: "sensor temperature", + Field: "temp", + Labels: []string{"sensor_id"}, + }, + }, + } + + err := h.Init(config) + require.NoError(t, err) + defer h.Stop() + + client := &mqtt.Client{} + packet := packets.Packet{ + Payload: []byte(`{"sensor_id": "001", "another_field": 25.5}`), + } + + h.OnPublished(client, packet) +} + +func TestOnPublishedMissingLabel(t *testing.T) { + h := new(Hook) + h.SetOpts(nil, nil) + + config := &Options{ + Port: ":9094", + Metrics: []MetricConfig{ + { + Name: "temperature", + Help: "sensor temperature", + Field: "temp", + Labels: []string{"sensor_id", "local"}, + }, + }, + } + + err := h.Init(config) + require.NoError(t, err) + defer h.Stop() + + client := &mqtt.Client{} + packet := packets.Packet{ + // Missing "local" + Payload: []byte(`{"sensor_id": "001", "temp": 25.5}`), + } + + // Should not panic. Ignore the metric update. + h.OnPublished(client, packet) +} + +func TestOnPublishedFieldNotNumber(t *testing.T) { + h := new(Hook) + h.SetOpts(nil, nil) + + config := &Options{ + Port: ":9095", + Metrics: []MetricConfig{ + { + Name: "temperature", + Help: "sensor temperature", + Field: "temp", + Labels: []string{"sensor_id"}, + }, + }, + } + + err := h.Init(config) + require.NoError(t, err) + defer h.Stop() + + client := &mqtt.Client{} + packet := packets.Packet{ + Payload: []byte(`{"sensor_id": "001", "temp": "not a number"}`), + } + + h.OnPublished(client, packet) +} + +func TestOnPublishedEmptyPayload(t *testing.T) { + h := new(Hook) + h.SetOpts(nil, nil) + + err := h.Init(&Options{Port: ":9096"}) + require.NoError(t, err) + defer h.Stop() + + client := &mqtt.Client{} + packet := packets.Packet{ + Payload: nil, + } + + h.OnPublished(client, packet) +}