Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slow-deer-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Expanded `admin profile` to collect PPROF profiles from LOOP Plugins. Added `-vitals` flag for more granular profiling.
115 changes: 102 additions & 13 deletions core/cmd/admin_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cmd

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"time"
Expand All @@ -20,6 +22,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/utils"
"github.com/smartcontractkit/chainlink/v2/core/web"
"github.com/smartcontractkit/chainlink/v2/core/web/presenters"
)

Expand Down Expand Up @@ -65,6 +68,10 @@ func initAdminSubCmds(s *Shell) []cli.Command {
Usage: "output directory of the captured profile",
Value: "/tmp/",
},
cli.StringSliceFlag{
Name: "vitals, v",
Usage: "vitals to collect, can be specified multiple times. Options: 'allocs', 'block', 'cmdline', 'goroutine', 'heap', 'mutex', 'profile', 'threadcreate', 'trace'",
},
},
},
{
Expand Down Expand Up @@ -319,16 +326,13 @@ func (s *Shell) Status(c *cli.Context) error {
// Profile will collect pprof metrics and store them in a folder.
func (s *Shell) Profile(c *cli.Context) error {
ctx := s.ctx()
seconds := c.Uint("seconds")
seconds := c.Int("seconds")
baseDir := c.String("output_dir")

genDir := filepath.Join(baseDir, "debuginfo-"+time.Now().Format(time.RFC3339))

if err := os.Mkdir(genDir, 0o755); err != nil {
return s.errorOut(err)
}
var wgPprof sync.WaitGroup
vitals := []string{
vitals := c.StringSlice("vitals")
allVitals := []string{
"allocs", // A sampling of all past memory allocations
"block", // Stack traces that led to blocking on synchronization primitives
"cmdline", // The command line invocation of the current program
Expand All @@ -339,15 +343,100 @@ func (s *Shell) Profile(c *cli.Context) error {
"threadcreate", // Stack traces that led to the creation of new OS threads
"trace", // A trace of execution of the current program.
}
wgPprof.Add(len(vitals))
s.Logger.Infof("Collecting profiles: %v", vitals)
if len(vitals) == 0 {
vitals = slices.Clone(allVitals)
} else if slices.ContainsFunc(vitals, func(s string) bool { return !slices.Contains(allVitals, s) }) {
return fmt.Errorf("invalid vitals: must be from the set: %v", allVitals)
}

plugins, err := s.discoverPlugins(ctx)
if err != nil {
return s.errorOut(err)
}
var names []string
for _, group := range plugins {
if name := group.Labels[web.LabelMetaPluginName]; name != "" {
names = append(names, name)
}
}

if len(names) == 0 {
s.Logger.Infof("Collecting profiles: %v", vitals)
} else {
s.Logger.Infof("Collecting profiles from host and %d plugins: %v", len(names), vitals)
}
s.Logger.Infof("writing debug info to %s", genDir)

var wg sync.WaitGroup
errs := make([]error, len(names)+1)
wg.Add(len(names) + 1)
go func() {
defer wg.Done()
errs[0] = s.profile(ctx, genDir, "", vitals, seconds)
}()
for i, name := range names {
go func() {
defer wg.Done()
errs[i] = s.profile(ctx, genDir, name, vitals, seconds)
}()
}
wg.Wait()

err = errors.Join(errs...)
if err != nil {
return s.errorOut(err)
}
return nil
}
func (s *Shell) discoverPlugins(ctx context.Context) (
got []struct {
Targets []string `yaml:"targets"`
Labels map[string]string `yaml:"labels"`
},
err error,
) {
resp, err := s.HTTP.Get(ctx, "/discovery")
if err != nil {
return
}
defer func() {
if resp.Body != nil {
resp.Body.Close()
}
}()
data, err := io.ReadAll(resp.Body)
if err != nil {
return
}

if err = json.Unmarshal(data, &got); err != nil {
s.Logger.Errorf("failed to unmarshal discovery response: %s", string(data))
return
}
return
}

func (s *Shell) profile(ctx context.Context, genDir string, name string, vitals []string, seconds int) error {
lggr := s.Logger
path := "/v2"
if name != "" {
genDir = filepath.Join(genDir, "plugins", name)
path += "/plugins/" + name
lggr = lggr.With("plugin", name)
}
if err := os.MkdirAll(genDir, 0o755); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}

errs := make(chan error, len(vitals))
var wgPprof sync.WaitGroup
wgPprof.Add(len(vitals))
for _, vt := range vitals {
go func(vt string) {
go func(ctx context.Context, vt string) {
defer wgPprof.Done()
uri := fmt.Sprintf("/v2/debug/pprof/%s?seconds=%d", vt, seconds)
ctx, cancel := context.WithTimeout(ctx, time.Duration(max(seconds, 0)+web.PPROFOverheadSeconds)*time.Second)
defer cancel()
uri := fmt.Sprintf(path+"/debug/pprof/%s?seconds=%d", vt, seconds)
resp, err := s.HTTP.Get(ctx, uri)
if err != nil {
errs <- fmt.Errorf("error collecting %s: %w", vt, err)
Expand Down Expand Up @@ -403,12 +492,12 @@ func (s *Shell) Profile(c *cli.Context) error {
errs <- fmt.Errorf("error closing file for %s: %w", vt, err)
return
}
}(vt)
}(ctx, vt)
}
wgPprof.Wait()
close(errs)
// Atmost one err is emitted per vital.
s.Logger.Infof("collected %d/%d profiles", len(vitals)-len(errs), len(vitals))
// At most one err is emitted per vital.
lggr.Infof("collected %d/%d profiles", len(vitals)-len(errs), len(vitals))
if len(errs) > 0 {
var merr error
for err := range errs {
Expand Down
4 changes: 4 additions & 0 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command {
Usage: "output directory of the captured profile",
Value: "/tmp/",
},
cli.StringSliceFlag{
Name: "vitals, v",
Usage: "vitals to collect, can be specified multiple times. Options: 'allocs', 'block', 'cmdline', 'goroutine', 'heap', 'mutex', 'profile', 'threadcreate', 'trace'",
},
},
Hidden: true,
Before: func(_ *cli.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ require (
github.com/smartcontractkit/chainlink-sui/deployment v0.0.0-20260217210647-11c42009ec1f // indirect
github.com/smartcontractkit/chainlink-testing-framework/framework/components/fake v0.10.0 // indirect
github.com/smartcontractkit/chainlink-testing-framework/parrot v0.6.2 // indirect
github.com/smartcontractkit/chainlink-ton v0.0.0-20260218110243-cd2592187c66 // indirect
github.com/smartcontractkit/chainlink-ton v0.0.0-20260218144352-f8d460be6125 // indirect
github.com/smartcontractkit/chainlink-ton/deployment v0.0.0-20260218110243-cd2592187c66 // indirect
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20260218133534-cbd44da2856b // indirect
github.com/smartcontractkit/cre-sdk-go v1.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1690,8 +1690,8 @@ github.com/smartcontractkit/chainlink-testing-framework/parrot v0.6.2 h1:cWUHB6Q
github.com/smartcontractkit/chainlink-testing-framework/parrot v0.6.2/go.mod h1:Z4K5VJLjsfqIIaBcZ1Sfccxu0xsCxBjPa6zF+5gtQaM=
github.com/smartcontractkit/chainlink-testing-framework/seth v1.51.3 h1:TZ0Yk+vjAJpoWnfsPdftWkq/NwZTrk734a/H4RHKnY8=
github.com/smartcontractkit/chainlink-testing-framework/seth v1.51.3/go.mod h1:kHYJnZUqiPF7/xN5273prV+srrLJkS77GbBXHLKQpx0=
github.com/smartcontractkit/chainlink-ton v0.0.0-20260218110243-cd2592187c66 h1:6l+kMm8s9V3cOxAeMpzfvj7CgiscKAPUudTf9KpgKNs=
github.com/smartcontractkit/chainlink-ton v0.0.0-20260218110243-cd2592187c66/go.mod h1:Sy0O2HOmKJ+m0CpvCwye3CF8VwldBBFpqFhKai+p8/g=
github.com/smartcontractkit/chainlink-ton v0.0.0-20260218144352-f8d460be6125 h1:ptVEMET7sCJg6ToNkDwiqu3PqtceTHTlePQZxCeYHMg=
github.com/smartcontractkit/chainlink-ton v0.0.0-20260218144352-f8d460be6125/go.mod h1:FDDjLuc4vrfclu3JHkMaREg0XZz7Lw1MK47Z4jJ4U5Q=
github.com/smartcontractkit/chainlink-ton/deployment v0.0.0-20260218110243-cd2592187c66 h1:PDsujAkEXtdfpArKAlQs9OsBmmyAZpv6d7jOjxs6uJM=
github.com/smartcontractkit/chainlink-ton/deployment v0.0.0-20260218110243-cd2592187c66/go.mod h1:GNtjS/cwOMowWXLq9HFtY8JlZ98lK9UHt1bEuaev52c=
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20260218133534-cbd44da2856b h1:0XLtETkgkzwnEgUIIgyO/oydkUpzDVVuuFLf6aBeNPg=
Expand Down
110 changes: 104 additions & 6 deletions core/web/loop_registry.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package web

import (
"bytes"
"context"
"encoding/json"
"fmt"
"html"
"io"
"net/http"
"net/url"
"os"
"strconv"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -19,13 +23,15 @@ import (
"github.com/smartcontractkit/chainlink/v2/plugins"
)

const LabelMetaPluginName = "__meta_plugin_name"

type LoopRegistryServer struct {
exposedPromPort int
discoveryHostName string // discovery endpoint hostname. must be accessible to external prom for scraping
loopHostName string // internal hostname of loopps. used by node to forward external prom requests
registry *plugins.LoopRegistry
logger logger.SugaredLogger
client *http.Client
promClient *http.Client

jsonMarshalFn func(any) ([]byte, error)
}
Expand All @@ -39,7 +45,7 @@ func NewLoopRegistryServer(app chainlink.Application) *LoopRegistryServer {
jsonMarshalFn: json.Marshal,
discoveryHostName: discoveryHostName,
loopHostName: loopHostName,
client: &http.Client{Timeout: 1 * time.Second}, // some value much less than the prometheus poll interval will do there
promClient: &http.Client{Timeout: 1 * time.Second}, // some value much less than the prometheus poll interval will do there
}
}

Expand All @@ -49,11 +55,13 @@ func (l *LoopRegistryServer) discoveryHandler(w http.ResponseWriter, req *http.R
var groups []*targetgroup.Group

// add node metrics to service discovery
groups = append(groups, metricTarget(l.discoveryHostName, l.exposedPromPort, "/metrics"))
groups = append(groups, pluginGroup(l.discoveryHostName, l.exposedPromPort, "/metrics"))

// add all the plugins
for _, registeredPlugin := range l.registry.List() {
groups = append(groups, metricTarget(l.discoveryHostName, l.exposedPromPort, pluginMetricPath(registeredPlugin.Name)))
group := pluginGroup(l.discoveryHostName, l.exposedPromPort, pluginMetricPath(registeredPlugin.Name))
group.Labels[LabelMetaPluginName] = model.LabelValue(registeredPlugin.Name)
groups = append(groups, group)
}

b, err := l.jsonMarshalFn(groups)
Expand All @@ -72,7 +80,7 @@ func (l *LoopRegistryServer) discoveryHandler(w http.ResponseWriter, req *http.R
}
}

func metricTarget(hostName string, port int, path string) *targetgroup.Group {
func pluginGroup(hostName string, port int, path string) *targetgroup.Group {
return &targetgroup.Group{
Targets: []model.LabelSet{
// target address will be called by external prometheus
Expand All @@ -95,7 +103,12 @@ func (l *LoopRegistryServer) pluginMetricHandler(gc *gin.Context) {

// unlike discovery, this endpoint is internal btw the node and plugin
pluginURL := fmt.Sprintf("http://%s:%d/metrics", l.loopHostName, p.EnvCfg.PrometheusPort)
res, err := l.client.Get(pluginURL) //nolint
req, err := http.NewRequestWithContext(gc.Request.Context(), "GET", pluginURL, nil)
if err != nil {
gc.Data(http.StatusInternalServerError, "text/plain", fmt.Appendf(nil, "error creating plugin metrics request: %s", err))
return
}
res, err := l.promClient.Do(req)
if err != nil {
msg := "plugin metric handler failed to get plugin url " + html.EscapeString(pluginURL)
l.logger.Errorw(msg, "err", err)
Expand All @@ -114,6 +127,91 @@ func (l *LoopRegistryServer) pluginMetricHandler(gc *gin.Context) {
gc.Data(http.StatusOK, "text/plain", b)
}

const PPROFOverheadSeconds = 30

func pprofURLVals(gc *gin.Context) (urlVals url.Values, timeout time.Duration) {
if db, ok := gc.GetQuery("debug"); ok {
urlVals.Set("debug", db)
}
if gc, ok := gc.GetQuery("gc"); ok {
urlVals.Set("gc", gc)
}
timeout = PPROFOverheadSeconds * time.Second
if sec, ok := gc.GetQuery("seconds"); ok {
urlVals.Set("seconds", sec)
if i, err := strconv.Atoi(sec); err == nil {
timeout = time.Duration(i+PPROFOverheadSeconds) * time.Second
}
}
return
}

func (l *LoopRegistryServer) pluginPPROFHandler(gc *gin.Context) {
pluginName := gc.Param("name")
p, ok := l.registry.Get(pluginName)
if !ok {
gc.Data(http.StatusNotFound, "text/plain", fmt.Appendf(nil, "plugin %q does not exist", html.EscapeString(pluginName)))
return
}

// unlike discovery, this endpoint is internal btw the node and plugin
pluginURL := fmt.Sprintf("http://%s:%d/debug/pprof/"+gc.Param("profile"), l.loopHostName, p.EnvCfg.PrometheusPort)
urlVals, timeout := pprofURLVals(gc)
if s := urlVals.Encode(); s != "" {
pluginURL += "?" + s
}
l.doRequest(gc, "GET", pluginURL, nil, timeout, pluginName)
}

func (l *LoopRegistryServer) pluginPPROFPOSTSymbolHandler(gc *gin.Context) {
pluginName := gc.Param("name")
p, ok := l.registry.Get(pluginName)
if !ok {
gc.Data(http.StatusNotFound, "text/plain", fmt.Appendf(nil, "plugin %q does not exist", html.EscapeString(pluginName)))
return
}

// unlike discovery, this endpoint is internal btw the node and plugin
pluginURL := fmt.Sprintf("http://%s:%d/debug/pprof/symbol", l.loopHostName, p.EnvCfg.PrometheusPort)
urlVals, timeout := pprofURLVals(gc)
if s := urlVals.Encode(); s != "" {
pluginURL += "?" + s
}
body, err := io.ReadAll(gc.Request.Body)
if err != nil {
gc.Data(http.StatusInternalServerError, "text/plain", fmt.Appendf(nil, "error reading plugin pprof request body: %s", err))
return
}
l.doRequest(gc, "POST", pluginURL, bytes.NewReader(body), timeout, pluginName)
}

func (l *LoopRegistryServer) doRequest(gc *gin.Context, method string, url string, body io.Reader, timeout time.Duration, pluginName string) {
ctx, cancel := context.WithTimeout(gc.Request.Context(), timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
gc.Data(http.StatusInternalServerError, "text/plain", fmt.Appendf(nil, "error creating plugin pprof request: %s", err))
return
}
res, err := http.DefaultClient.Do(req)
if err != nil {
msg := "plugin pprof handler failed to post plugin url " + html.EscapeString(url)
l.logger.Errorw(msg, "err", err)
gc.Data(http.StatusInternalServerError, "text/plain", fmt.Appendf(nil, "%s: %s", msg, err))
return
}
defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
msg := fmt.Sprintf("error reading plugin %q pprof", html.EscapeString(pluginName))
l.logger.Errorw(msg, "err", err)
gc.Data(http.StatusInternalServerError, "text/plain", fmt.Appendf(nil, "%s: %s", msg, err))
return
}

gc.Data(http.StatusOK, "text/plain", b)
}

func initHostNames() (discoveryHost, loopHost string) {
var exists bool
discoveryHost, exists = env.PrometheusDiscoveryHostName.Lookup()
Expand Down
Loading
Loading