From fe16cad233759aa7484cf92083dcdf97cc4ed8fe Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Thu, 13 Feb 2025 00:16:14 -0500 Subject: [PATCH] core/services/promotel: add Forwarder service --- core/services/chainlink/application.go | 5 ++ core/services/promotel/promotel.go | 81 ++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 core/services/promotel/promotel.go diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index b8b2178780b..47cf585996a 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/pyroscope-go" "github.com/jonboulle/clockwork" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -66,6 +67,7 @@ import ( externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/promotel" "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" @@ -277,6 +279,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) { restrictedHTTPClient := opts.RestrictedHTTPClient unrestrictedHTTPClient := opts.UnrestrictedHTTPClient + promForwarder := promotel.NewForwarder(globalLogger, prometheus.DefaultGatherer, otel.GetMeterProvider()) + srvcs = append(srvcs, promForwarder) + if opts.CapabilitiesRegistry == nil { // for tests only, in prod Registry should always be set at this point opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) diff --git a/core/services/promotel/promotel.go b/core/services/promotel/promotel.go new file mode 100644 index 00000000000..4e66f59ef5b --- /dev/null +++ b/core/services/promotel/promotel.go @@ -0,0 +1,81 @@ +package promotel + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" +) + +const period = 15 * time.Second + +type Forwarder struct { + services.StateMachine + lggr logger.Logger + gatherer prometheus.Gatherer + meterProvider metric.MeterProvider + stopCh services.StopChan + done chan struct{} +} + +func NewForwarder(lggr logger.Logger, gatherer prometheus.Gatherer, meterProvider metric.MeterProvider) *Forwarder { + return &Forwarder{ + lggr: logger.Named(lggr, "PromOTELForwarder"), + gatherer: gatherer, + meterProvider: meterProvider, + stopCh: make(chan struct{}), + done: make(chan struct{}), + } +} + +func (f *Forwarder) HealthReport() map[string]error { return map[string]error{f.Name(): f.Healthy()} } + +func (f *Forwarder) Name() string { return f.lggr.Name() } + +func (f *Forwarder) Start(context.Context) error { + go f.run() + return nil +} + +func (f *Forwarder) run() { + defer close(f.done) + ctx, cancel := f.stopCh.NewCtx() + defer cancel() + ticker := timeutil.NewTicker(func() time.Duration { return period }) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + f.forward(ctx) + } + } +} + +func (f *Forwarder) forward(ctx context.Context) { + mfs, err := f.gatherer.Gather() + if err != nil { + f.lggr.Errorw("Failed to gather prometheus metrics", "err", err) + } + for _, mf := range mfs { + for range mf.Metric { + if ctx.Err() != nil { + return + } + + //TODO f.meterProvider.Meter() + } + } +} + +func (f *Forwarder) Close() error { + close(f.stopCh) + <-f.done + return nil +}