Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions examples/telemetry/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main

import (
"encoding/json"
"flag"
"log"
"math/rand"
"os"
"os/signal"
"syscall"
"time"

mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
prom "github.com/mochi-mqtt/server/v2/hooks/telemetry"
"github.com/mochi-mqtt/server/v2/listeners"
"gopkg.in/yaml.v3"
)

// TagPayload represents the structure of the tag location data.
// This is just an example structure and can be modified as needed.
type TagPayload struct {
UUID string `json:"uuid"`
X float64 `json:"x"`
Y float64 `json:"y"`
}

func main() {
metricsFile := flag.String("metrics", "metrics.yaml", "metrics configuration file")
flag.Parse()

sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()

// An example of configuring various server options...
options := &mqtt.Options{
InlineClient: true,
}

server := mqtt.New(options)

// Exporter Hook
// Load metrics configuration from file
metricsBytes, err := os.ReadFile(*metricsFile)
if err != nil {
log.Fatal("failed to read metrics file:", err)
}

var exporterOpts prom.Options
if err := yaml.Unmarshal(metricsBytes, &exporterOpts); err != nil {
log.Fatal("failed to parse metrics file:", err)
}

_ = server.AddHook(new(prom.Hook), &exporterOpts)

// For security reasons, the default implementation disallows all connections.
// If you want to allow all connections, you must specifically allow it.
err = server.AddHook(new(auth.AllowHook), nil)
if err != nil {
log.Fatal(err)
}

tcp := listeners.NewTCP(listeners.Config{
ID: "t1",
Address: ":1883",
})
err = server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}

go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()

// Simulate tag publishing location data every 5 seconds
go func() {
for range time.Tick(time.Second * 5) {
lat := 40.0 + rand.Float64()*(41.0-40.0)
lng := -74.0 + rand.Float64()*(-73.0+74.0)

payload := TagPayload{
UUID: "234e5678-e89b-12d3-a456-426614174000",
X: lng,
Y: lat,
}
data, _ := json.Marshal(payload)
err := server.Publish("tag/tunnel", data, false, 0)
if err != nil {
server.Log.Error("server.Publish", "error", err)
}
server.Log.Info("main.go issued direct message to tag/tunnel")
}
}()

<-done
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}
10 changes: 10 additions & 0 deletions examples/telemetry/metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
port: 9091
metrics:
- name: "location_x"
help: "X coordinate"
field: "x"
labels: ["uuid"]
- name: "location_y"
help: "Y coordinate"
field: "y"
labels: ["uuid"]
155 changes: 155 additions & 0 deletions hooks/telemetry/prom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package prom

import (
"bytes"
"encoding/json"
"log"
"log/slog"
"net/http"

mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type MetricConfig struct {
Name string `yaml:"name" json:"name"`
Help string `yaml:"help" json:"help"`
Field string `yaml:"field" json:"field"`
Labels []string `yaml:"labels" json:"labels"`
}

type Options struct {
Port string `yaml:"port" json:"port"`
Metrics []MetricConfig `yaml:"metrics" json:"metrics"`
}

type Hook struct {
mqtt.HookBase
config *Options
server *http.Server
regist *prometheus.Registry
metric map[string]*prometheus.GaugeVec
}

func (h *Hook) ID() string {
return "prometheus-exporter"
}

func (h *Hook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnPublished,
}, []byte{b})
}

func (h *Hook) Init(config any) error {
if _, ok := config.(*Options); !ok && config != nil {
return mqtt.ErrInvalidConfigType
}

if config == nil {
config = new(Options)
}

if h.Log == nil {
h.Log = slog.Default()
}

h.config = config.(*Options)
h.regist = prometheus.NewRegistry()
h.metric = make(map[string]*prometheus.GaugeVec)

// Create Prometheus Gauges for each metric
for _, m := range h.config.Metrics {
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: m.Name,
Help: m.Help,
},
m.Labels,
)
h.metric[m.Name] = gauge
if err := h.regist.Register(gauge); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
h.metric[m.Name] = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
return err
}
}
}

// Prometheus HTTP Server
addr := h.config.Port
if addr != "" && addr[0] != ':' {
addr = ":" + addr
} else {
addr = ":9090"
}

h.server = &http.Server{Addr: addr}
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(h.regist, promhttp.HandlerOpts{}))
h.server.Handler = mux

go func() {
log.Printf("Prometheus metrics server starting on %s/metrics", addr)
if err := h.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("Error starting Prometheus HTTP server: %v", err)
}
}()

return nil
}

func (h *Hook) Stop() error {
if h.server != nil {
h.Log.Info("Stopping Prometheus metrics server")
return h.server.Close()
}
return nil
}

func (h *Hook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
var payload map[string]any
if err := json.Unmarshal(pk.Payload, &payload); err != nil {
h.Log.Info("failed to unmarshal payload", "error", err)
return
}

for _, m := range h.config.Metrics {
metric, ok := h.metric[m.Name]
if !ok {
continue
}

value, ok := payload[m.Field].(float64)
if !ok {
h.Log.Info("field not found or not a number", "field", m.Field)
continue
}

labels := make(prometheus.Labels)
allLabelsPresent := true
for _, label := range m.Labels {
if labelValue, ok := payload[label].(string); ok {
labels[label] = labelValue
} else {
h.Log.Info("label not found or not a string", "label", label)
allLabelsPresent = false
break
}
}

if !allLabelsPresent {
h.Log.Info("skipping metric update due to missing labels", "metric", m.Name)
continue
}

metric.With(labels).Set(value)
}
}
Loading