Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/grafana/pyroscope-go"
"github.com/jonboulle/clockwork"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -66,6 +67,7 @@ import (
externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/promotel"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
Expand Down Expand Up @@ -277,6 +279,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
restrictedHTTPClient := opts.RestrictedHTTPClient
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient

promForwarder := promotel.NewForwarder(globalLogger, prometheus.DefaultGatherer, otel.GetMeterProvider())
srvcs = append(srvcs, promForwarder)

if opts.CapabilitiesRegistry == nil {
// for tests only, in prod Registry should always be set at this point
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
Expand Down
81 changes: 81 additions & 0 deletions core/services/promotel/promotel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package promotel

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

const period = 15 * time.Second

type Forwarder struct {
services.StateMachine
lggr logger.Logger
gatherer prometheus.Gatherer
meterProvider metric.MeterProvider
stopCh services.StopChan
done chan struct{}
}

func NewForwarder(lggr logger.Logger, gatherer prometheus.Gatherer, meterProvider metric.MeterProvider) *Forwarder {
return &Forwarder{
lggr: logger.Named(lggr, "PromOTELForwarder"),
gatherer: gatherer,
meterProvider: meterProvider,
stopCh: make(chan struct{}),
done: make(chan struct{}),
}
}

func (f *Forwarder) HealthReport() map[string]error { return map[string]error{f.Name(): f.Healthy()} }

func (f *Forwarder) Name() string { return f.lggr.Name() }

func (f *Forwarder) Start(context.Context) error {
go f.run()
return nil
}

func (f *Forwarder) run() {
defer close(f.done)
ctx, cancel := f.stopCh.NewCtx()
defer cancel()
ticker := timeutil.NewTicker(func() time.Duration { return period })
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
f.forward(ctx)
}
}
}

func (f *Forwarder) forward(ctx context.Context) {
mfs, err := f.gatherer.Gather()
if err != nil {
f.lggr.Errorw("Failed to gather prometheus metrics", "err", err)
}
for _, mf := range mfs {
for range mf.Metric {
if ctx.Err() != nil {
return
}

//TODO f.meterProvider.Meter()

Check failure on line 72 in core/services/promotel/promotel.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

commentFormatting: put a space between `//` and comment text (gocritic)
}
}
}

func (f *Forwarder) Close() error {
close(f.stopCh)
<-f.done
return nil
}
Loading