diff --git a/.github/workflows/sui-ccip-test.yml b/.github/workflows/sui-ccip-test.yml index c082592b6..f0d9ccfa8 100644 --- a/.github/workflows/sui-ccip-test.yml +++ b/.github/workflows/sui-ccip-test.yml @@ -23,8 +23,8 @@ jobs: sui_version: mainnet-v1.60.1 - test_name: Test_CCIP_Messaging_EVM2Sui sui_version: mainnet-v1.60.1 - - test_name: Test_CCIP_EVM2Sui_ZeroReceiver - sui_version: mainnet-v1.60.1 + # - test_name: Test_CCIP_EVM2Sui_ZeroReceiver + # sui_version: mainnet-v1.60.1 - test_name: Test_CCIPTokenTransfer_Sui2EVM_LockReleaseTokenPool sui_version: mainnet-v1.60.1 - test_name: Test_CCIPTokenTransfer_Sui2EVM_BurnMintTokenPool diff --git a/go.mod b/go.mod index 7f6283f19..3a4ea2e70 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ go 1.25.5 replace github.com/fbsobreira/gotron-sdk => github.com/smartcontractkit/chainlink-tron/relayer/gotron-sdk v0.0.4 require ( - github.com/aptos-labs/aptos-go-sdk v1.7.1-0.20250602153733-bb1facae1d43 + github.com/aptos-labs/aptos-go-sdk v1.11.0 github.com/aptos-labs/tree-sitter-move-on-aptos v0.0.0-20250321090037-c820eb4716e1 github.com/block-vision/sui-go-sdk v1.1.3 github.com/ethereum/go-ethereum v1.16.2 @@ -19,7 +19,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 - github.com/smartcontractkit/chainlink-aptos v0.0.0-20250905094443-ac02b032b32b + github.com/smartcontractkit/chainlink-aptos v0.0.0-20251024142440-51f2ad2652a2 github.com/smartcontractkit/chainlink-ccip v0.0.0-20250805210128-7f8a0f403c3a github.com/smartcontractkit/chainlink-common v0.9.6-0.20260209153333-67bf1aaa3e1e github.com/stretchr/testify v1.11.1 @@ -47,7 +47,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.16.1 // indirect github.com/cloudevents/sdk-go/v2 v2.16.1 // indirect - github.com/coder/websocket v1.8.13 // indirect + github.com/coder/websocket v1.8.14 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect @@ -71,7 +71,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/yamux v0.1.2 // indirect - github.com/hasura/go-graphql-client v0.13.1 // indirect + github.com/hasura/go-graphql-client v0.14.5 // indirect github.com/hdevalence/ed25519consensus v0.2.0 // indirect github.com/holiman/uint256 v1.3.2 // indirect github.com/invopop/jsonschema v0.13.0 // indirect diff --git a/go.sum b/go.sum index 75a5a5273..8ab51e004 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/apache/arrow-go/v18 v18.3.1 h1:oYZT8FqONiK74JhlH3WKVv+2NKYoyZ7C2ioD4D github.com/apache/arrow-go/v18 v18.3.1/go.mod h1:12QBya5JZT6PnBihi5NJTzbACrDGXYkrgjujz3MRQXU= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= -github.com/aptos-labs/aptos-go-sdk v1.7.1-0.20250602153733-bb1facae1d43 h1:Mn2LI+fa8QzVmXQ/30MT76GYTQIxpSB3lzos0hat4qc= -github.com/aptos-labs/aptos-go-sdk v1.7.1-0.20250602153733-bb1facae1d43/go.mod h1:vYm/yHr6cQpoUBMw/Q93SRR1IhP0mPTBrEGjShwUvXc= +github.com/aptos-labs/aptos-go-sdk v1.11.0 h1:vIL1hpjECUiu7zMl9Wz6VV8ttXsrDqKUj0HxoeaIER4= +github.com/aptos-labs/aptos-go-sdk v1.11.0/go.mod h1:8YvYwRg93UcG6pTStCpZdYiscCtKh51sYfeLgIy/41c= github.com/aptos-labs/tree-sitter-move-on-aptos v0.0.0-20250321090037-c820eb4716e1 h1:KD231JW9jSiu5m0J/w//3qJyGRdvrdabKAF+Fbwvzgo= github.com/aptos-labs/tree-sitter-move-on-aptos v0.0.0-20250321090037-c820eb4716e1/go.mod h1:+WZUlAOW0a0+7CrPgFVwmflo1LHH61uw4WSJtboIk48= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= @@ -51,8 +51,8 @@ github.com/cloudevents/sdk-go/v2 v2.16.1/go.mod h1:v/kVOaWjNfbvc6tkhhlkhvLapj8Aa github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= -github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE= -github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY= @@ -60,8 +60,8 @@ github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4x github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cucumber/gherkin/go/v26 v26.2.0 h1:EgIjePLWiPeslwIWmNQ3XHcypPsWAHoMCz/YEBKP4GI= github.com/cucumber/gherkin/go/v26 v26.2.0/go.mod h1:t2GAPnB8maCT4lkHL99BDCVNzCh1d7dBhCLt150Nr/0= -github.com/cucumber/godog v0.15.0 h1:51AL8lBXF3f0cyA5CV4TnJFCTHpgiy+1x1Hb3TtZUmo= -github.com/cucumber/godog v0.15.0/go.mod h1:FX3rzIDybWABU4kuIXLZ/qtqEe1Ac5RdXmqvACJOces= +github.com/cucumber/godog v0.15.1 h1:rb/6oHDdvVZKS66hrhpjFQFHjthFSrQBCOI1LwshNTI= +github.com/cucumber/godog v0.15.1/go.mod h1:qju+SQDewOljHuq9NSM66s0xEhogx0q30flfxL4WUk8= github.com/cucumber/messages/go/v21 v21.0.1 h1:wzA0LxwjlWQYZd32VTlAVDTkW6inOFmSM+RuOwHZiMI= github.com/cucumber/messages/go/v21 v21.0.1/go.mod h1:zheH/2HS9JLVFukdrsPWoPdmUtmYQAQPLk7w5vWsk5s= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -169,8 +169,8 @@ github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iP github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8= github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns= -github.com/hasura/go-graphql-client v0.13.1 h1:kKbjhxhpwz58usVl+Xvgah/TDha5K2akNTRQdsEHN6U= -github.com/hasura/go-graphql-client v0.13.1/go.mod h1:k7FF7h53C+hSNFRG3++DdVZWIuHdCaTbI7siTJ//zGQ= +github.com/hasura/go-graphql-client v0.14.5 h1:M9HxxGLCcDZnxJGYyWXAzDYEpommgjW+sUW3V8EaGms= +github.com/hasura/go-graphql-client v0.14.5/go.mod h1:jfSZtBER3or+88Q9vFhWHiFMPppfYILRyl+0zsgPIIw= github.com/hdevalence/ed25519consensus v0.2.0 h1:37ICyZqdyj0lAZ8P4D1d1id3HqbbG1N3iBb1Tb4rdcU= github.com/hdevalence/ed25519consensus v0.2.0/go.mod h1:w3BHWjwJbFU29IRHL1Iqkw3sus+7FctEyM4RqDxYNzo= github.com/holiman/uint256 v1.3.2 h1:a9EgMPSC1AAaj1SZL5zIQD3WbwTuHrMGOerLjGmM/TA= diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index 7f665d486..da5ac0697 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -36,6 +36,9 @@ type EventsIndexer struct { // a map of event handles to the last processed cursor lastProcessedCursors map[string]*models.EventId cursorMutex sync.RWMutex + + // Optional callback for recording successful sync operations + onSyncSuccess func(ctx context.Context) } type EventsIndexerApi interface { @@ -44,6 +47,7 @@ type EventsIndexerApi interface { SyncEvent(ctx context.Context, selector *client.EventSelector) error AddEventSelector(ctx context.Context, selector *client.EventSelector) error SetEventOffsetOverrides(ctx context.Context, offsetOverrides map[string]client.EventId) error + SetOnSyncSuccess(callback func(ctx context.Context)) Ready() error Close() error } @@ -91,6 +95,10 @@ func (eIndexer *EventsIndexer) Start(ctx context.Context) error { eIndexer.logger.Warnw("EventSync timed out", "duration", elapsed) } else { eIndexer.logger.Debugw("Event sync completed successfully", "duration", elapsed) + // Record successful sync for health metrics + if eIndexer.onSyncSuccess != nil { + eIndexer.onSyncSuccess(ctx) + } } cancel() @@ -443,3 +451,8 @@ func (eIndexer *EventsIndexer) Close() error { // TODO: implement return nil } + +// SetOnSyncSuccess sets a callback function that is called after a successful sync operation. +func (eIndexer *EventsIndexer) SetOnSyncSuccess(callback func(ctx context.Context)) { + eIndexer.onSyncSuccess = callback +} diff --git a/relayer/chainreader/indexer/indexer.go b/relayer/chainreader/indexer/indexer.go index 280a02308..4bb451502 100644 --- a/relayer/chainreader/indexer/indexer.go +++ b/relayer/chainreader/indexer/indexer.go @@ -7,6 +7,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + + "github.com/smartcontractkit/chainlink-sui/relayer/monitor" ) type Indexer struct { @@ -22,6 +24,9 @@ type Indexer struct { transactionIndexerErr atomic.Value // stores error from transaction indexer goroutine wg sync.WaitGroup // wait for both indexer goroutines to exit + + // Health metrics for monitoring (optional) + healthMetrics *monitor.HealthMetrics } type IndexerApi interface { @@ -54,6 +59,16 @@ func (i *Indexer) Name() string { func (i *Indexer) Start(_ context.Context) error { return i.starter.StartOnce(i.Name(), func() error { + // Set up health metrics callbacks if health metrics are configured + if i.healthMetrics != nil { + i.eventsIndexer.SetOnSyncSuccess(func(ctx context.Context) { + i.RecordEventsIndexerSuccess(ctx) + }) + i.transactionIndexer.SetOnSyncSuccess(func(ctx context.Context) { + i.RecordTransactionsIndexerSuccess(ctx) + }) + } + // Events indexer eventsIndexerCtx, eventsIndexerCancel := context.WithCancel(context.Background()) i.eventsIndexerCancel = &eventsIndexerCancel @@ -158,3 +173,23 @@ func (i *Indexer) GetTransactionIndexer() TransactionsIndexerApi { } return i.transactionIndexer } + +// SetHealthMetrics sets the health metrics instance for the indexer. +// This should be called after creating the indexer to enable health metrics reporting. +func (i *Indexer) SetHealthMetrics(hm *monitor.HealthMetrics) { + i.healthMetrics = hm +} + +// RecordEventsIndexerSuccess records a successful events indexer sync operation. +func (i *Indexer) RecordEventsIndexerSuccess(ctx context.Context) { + if i.healthMetrics != nil { + i.healthMetrics.RecordLastSuccess(ctx, monitor.ComponentEventsIndexer) + } +} + +// RecordTransactionsIndexerSuccess records a successful transactions indexer sync operation. +func (i *Indexer) RecordTransactionsIndexerSuccess(ctx context.Context) { + if i.healthMetrics != nil { + i.healthMetrics.RecordLastSuccess(ctx, monitor.ComponentTransactionsIndexer) + } +} diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index 7658819d8..e323e4835 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -50,11 +50,15 @@ type TransactionsIndexer struct { mu sync.RWMutex offrampPackageIdReady chan struct{} offrampPackageOnce sync.Once + + // Optional callback for recording successful sync operations + onSyncSuccess func(ctx context.Context) } type TransactionsIndexerApi interface { Start(ctx context.Context) error SetOffRampPackage(pkg string, latestPkg string) + SetOnSyncSuccess(callback func(ctx context.Context)) Ready() error Close() error } @@ -115,6 +119,10 @@ func (tIndexer *TransactionsIndexer) Start(ctx context.Context) error { tIndexer.logger.Warnw("Transaction sync timed out", "duration", elapsed) } else { tIndexer.logger.Debugw("Transaction sync completed successfully", "duration", elapsed) + // Record successful sync for health metrics + if tIndexer.onSyncSuccess != nil { + tIndexer.onSyncSuccess(ctx) + } } cancel() @@ -837,3 +845,8 @@ func (tIndexer *TransactionsIndexer) Close() error { // TODO: implement return nil } + +// SetOnSyncSuccess sets a callback function that is called after a successful sync operation. +func (tIndexer *TransactionsIndexer) SetOnSyncSuccess(callback func(ctx context.Context)) { + tIndexer.onSyncSuccess = callback +} diff --git a/relayer/chainreader/reader/chainreader.go b/relayer/chainreader/reader/chainreader.go index eefc77c99..82c2c3e44 100644 --- a/relayer/chainreader/reader/chainreader.go +++ b/relayer/chainreader/reader/chainreader.go @@ -28,6 +28,7 @@ import ( "github.com/smartcontractkit/chainlink-sui/relayer/client" "github.com/smartcontractkit/chainlink-sui/relayer/codec" "github.com/smartcontractkit/chainlink-sui/relayer/common" + "github.com/smartcontractkit/chainlink-sui/relayer/monitor" "github.com/smartcontractkit/chainlink-common/pkg/logger" pkgtypes "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -58,6 +59,9 @@ type suiChainReader struct { // Value: parent object ID parentObjectIDs map[string]string parentObjectIDsMutex sync.RWMutex + + // Health metrics for monitoring (optional) + healthMetrics *monitor.HealthMetrics } var _ pkgtypes.ContractTypeProvider = &suiChainReader{} @@ -155,6 +159,19 @@ func (s *suiChainReader) Close() error { }) } +// SetHealthMetrics sets the health metrics instance for the chain reader. +// This should be called after creating the chain reader to enable health metrics reporting. +func (s *suiChainReader) SetHealthMetrics(hm *monitor.HealthMetrics) { + s.healthMetrics = hm +} + +// recordLastSuccess records a successful operation to the health metrics. +func (s *suiChainReader) recordLastSuccess(ctx context.Context) { + if s.healthMetrics != nil { + s.healthMetrics.RecordLastSuccess(ctx, monitor.ComponentChainReader) + } +} + func (s *suiChainReader) Bind(ctx context.Context, bindings []pkgtypes.BoundContract) error { offrampPackageAddress := "" diff --git a/relayer/chainwriter/chainwriter.go b/relayer/chainwriter/chainwriter.go index 8a4c9dec4..667911a3d 100644 --- a/relayer/chainwriter/chainwriter.go +++ b/relayer/chainwriter/chainwriter.go @@ -14,6 +14,7 @@ import ( cwConfig "github.com/smartcontractkit/chainlink-sui/relayer/chainwriter/config" "github.com/smartcontractkit/chainlink-sui/relayer/chainwriter/ptb" "github.com/smartcontractkit/chainlink-sui/relayer/chainwriter/ptb/offramp" + "github.com/smartcontractkit/chainlink-sui/relayer/monitor" "github.com/smartcontractkit/chainlink-sui/relayer/txm" ) @@ -29,6 +30,9 @@ type SuiChainWriter struct { simulate bool ptbFactory *ptb.PTBConstructor services.StateMachine + + // Health metrics for monitoring (optional) + healthMetrics *monitor.HealthMetrics } func NewSuiChainWriter(lggr logger.Logger, txManager txm.TxManager, config cwConfig.ChainWriterConfig, simulate bool) (*SuiChainWriter, error) { @@ -140,6 +144,8 @@ func (s *SuiChainWriter) SubmitTransaction(ctx context.Context, contractName str } s.lggr.Infow("Transaction enqueued", "transactionID", tx.TransactionID, "functionName", method) + s.recordLastSuccess(ctx) + return nil } @@ -197,6 +203,19 @@ func (s *SuiChainWriter) Start(ctx context.Context) error { }) } +// SetHealthMetrics sets the health metrics instance for the chain writer. +// This should be called after creating the chain writer to enable health metrics reporting. +func (s *SuiChainWriter) SetHealthMetrics(hm *monitor.HealthMetrics) { + s.healthMetrics = hm +} + +// recordLastSuccess records a successful operation to the health metrics. +func (s *SuiChainWriter) recordLastSuccess(ctx context.Context) { + if s.healthMetrics != nil { + s.healthMetrics.RecordLastSuccess(ctx, monitor.ComponentChainWriter) + } +} + func (s *SuiChainWriter) EstimateGasBudgetFromCCIPExecuteMessage(ctx context.Context, arguments map[string]any, meta *commonTypes.TxMeta) (*big.Int, error) { offrampArgs, err := offramp.DecodeOffRampExecCallArgs(arguments) if err != nil { diff --git a/relayer/config/chain_config.go b/relayer/config/chain_config.go index f86bc9644..04b9cb364 100644 --- a/relayer/config/chain_config.go +++ b/relayer/config/chain_config.go @@ -23,13 +23,6 @@ const ( DefaultTransactionRetentionSecs = uint64(10) ) -type ChainInfo struct { - ChainFamilyName string - ChainID string - NetworkName string - NetworkNameFull string -} - type NodeConfig struct { Name *string URL *config.URL diff --git a/relayer/monitor/balance.go b/relayer/monitor/balance.go index 486f2a6be..634ba53d3 100644 --- a/relayer/monitor/balance.go +++ b/relayer/monitor/balance.go @@ -9,6 +9,7 @@ import ( "github.com/smartcontractkit/chainlink-sui/relayer/client" aptosBalanceMonitor "github.com/smartcontractkit/chainlink-aptos/relayer/monitor" + aptosTypes "github.com/smartcontractkit/chainlink-aptos/relayer/types" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" @@ -20,7 +21,7 @@ const BalanceCheckTimeout = 30 * time.Second // BalanceMonitorOpts contains the options for creating a new Sui account balance monitor. type BalanceMonitorOpts struct { - ChainInfo aptosBalanceMonitor.ChainInfo + ChainInfo aptosTypes.ChainInfo Config aptosBalanceMonitor.GenericBalanceConfig Logger logger.Logger diff --git a/relayer/monitor/health_metrics.go b/relayer/monitor/health_metrics.go new file mode 100644 index 000000000..b7eb227bd --- /dev/null +++ b/relayer/monitor/health_metrics.go @@ -0,0 +1,207 @@ +package monitor + +import ( + "context" + "fmt" + "sync" + "time" + + aptosTypes "github.com/smartcontractkit/chainlink-aptos/relayer/types" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + + "github.com/smartcontractkit/chainlink-aptos/relayer/monitoring/metric/utils" +) + +// Component names for health metrics +const ( + ComponentSuiRelayer = "SuiRelayer" + ComponentTxM = "TxM" + ComponentEventsIndexer = "EventsIndexer" + ComponentTransactionsIndexer = "TransactionsIndexer" + ComponentChainReader = "ChainReader" + ComponentChainWriter = "ChainWriter" +) + +// HealthMetrics provides per-component health metrics that are pushed to Grafana. +// It tracks component status, flip-flop counts, and processing lag for monitoring +// and alerting purposes. +type HealthMetrics struct { + chainInfo aptosTypes.ChainInfo + + // Metrics + statusGauge metric.Int64Gauge + flipFlopCounter metric.Int64Counter + lastSuccessTimestampGauge metric.Int64Gauge + processingLagGauge metric.Float64Gauge + + // Internal state for tracking flip-flops + mu sync.RWMutex + lastHealthStatus map[string]bool + lastSuccessTime map[string]time.Time +} + +// NewHealthMetrics creates a new HealthMetrics instance and pre-registers all metrics. +// Metrics are registered at startup so they are always present for alerting. +func NewHealthMetrics(chainInfo aptosTypes.ChainInfo) (*HealthMetrics, error) { + meter := beholder.GetMeter() + + statusGauge, err := meter.Int64Gauge( + "sui_component_status", + metric.WithDescription("Component health status: 1=healthy, 0=unhealthy"), + metric.WithUnit("1"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create status gauge: %w", err) + } + + flipFlopCounter, err := meter.Int64Counter( + "sui_component_flip_flops_total", + metric.WithDescription("Number of healthy/unhealthy state transitions"), + metric.WithUnit("1"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create flip-flop counter: %w", err) + } + + lastSuccessTimestampGauge, err := meter.Int64Gauge( + "sui_component_last_success_timestamp", + metric.WithDescription("Unix timestamp of last successful operation"), + metric.WithUnit("s"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create last success timestamp gauge: %w", err) + } + + processingLagGauge, err := meter.Float64Gauge( + "sui_component_processing_lag_seconds", + metric.WithDescription("Time since last successful operation in seconds"), + metric.WithUnit("s"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create processing lag gauge: %w", err) + } + + hm := &HealthMetrics{ + chainInfo: chainInfo, + statusGauge: statusGauge, + flipFlopCounter: flipFlopCounter, + lastSuccessTimestampGauge: lastSuccessTimestampGauge, + processingLagGauge: processingLagGauge, + lastHealthStatus: make(map[string]bool), + lastSuccessTime: make(map[string]time.Time), + } + + // Pre-register all component metrics with initial values + components := []string{ + ComponentSuiRelayer, + ComponentTxM, + ComponentEventsIndexer, + ComponentTransactionsIndexer, + ComponentChainReader, + ComponentChainWriter, + } + + ctx := context.Background() + now := time.Now() + for _, component := range components { + // Initialize with unknown/unhealthy state + hm.lastHealthStatus[component] = false + hm.lastSuccessTime[component] = now + + // Record initial metrics + attrs := hm.getAttributes(component) + hm.statusGauge.Record(ctx, 0, metric.WithAttributeSet(attrs)) + hm.lastSuccessTimestampGauge.Record(ctx, now.Unix(), metric.WithAttributeSet(attrs)) + hm.processingLagGauge.Record(ctx, 0, metric.WithAttributeSet(attrs)) + } + + return hm, nil +} + +// RecordHealth records the health status of a component and tracks flip-flops. +func (hm *HealthMetrics) RecordHealth(ctx context.Context, component string, healthy bool) { + hm.mu.Lock() + defer hm.mu.Unlock() + + attrs := hm.getAttributes(component) + + // Check for flip-flop (state change) + if lastStatus, exists := hm.lastHealthStatus[component]; exists && lastStatus != healthy { + hm.flipFlopCounter.Add(ctx, 1, metric.WithAttributeSet(attrs)) + } + hm.lastHealthStatus[component] = healthy + + // Record current status + var statusValue int64 + if healthy { + statusValue = 1 + } + hm.statusGauge.Record(ctx, statusValue, metric.WithAttributeSet(attrs)) +} + +// RecordLastSuccess records the timestamp of a successful operation and updates +// the processing lag metric. +func (hm *HealthMetrics) RecordLastSuccess(ctx context.Context, component string) { + hm.mu.Lock() + defer hm.mu.Unlock() + + now := time.Now() + hm.lastSuccessTime[component] = now + + attrs := hm.getAttributes(component) + hm.lastSuccessTimestampGauge.Record(ctx, now.Unix(), metric.WithAttributeSet(attrs)) + hm.processingLagGauge.Record(ctx, 0, metric.WithAttributeSet(attrs)) +} + +// UpdateProcessingLag updates the processing lag for all components based on their +// last success time. This should be called periodically. +func (hm *HealthMetrics) UpdateProcessingLag(ctx context.Context) { + hm.mu.RLock() + defer hm.mu.RUnlock() + + now := time.Now() + for component, lastSuccess := range hm.lastSuccessTime { + lag := now.Sub(lastSuccess).Seconds() + attrs := hm.getAttributes(component) + hm.processingLagGauge.Record(ctx, lag, metric.WithAttributeSet(attrs)) + } +} + +// RecordHealthFromReport processes a HealthReport map and records metrics for all components. +func (hm *HealthMetrics) RecordHealthFromReport(ctx context.Context, report map[string]error) { + for component, err := range report { + healthy := err == nil + hm.RecordHealth(ctx, component, healthy) + } +} + +// getAttributes returns the OpenTelemetry attributes for a component. +func (hm *HealthMetrics) getAttributes(component string) attribute.Set { + return attribute.NewSet( + attribute.String("component", component), + attribute.String("chain_family_name", utils.ValOrUnknown(hm.chainInfo.ChainFamilyName)), + attribute.String("chain_id", utils.ValOrUnknown(hm.chainInfo.ChainID)), + attribute.String("network_name", utils.ValOrUnknown(hm.chainInfo.NetworkName)), + attribute.String("network_name_full", utils.ValOrUnknown(hm.chainInfo.NetworkNameFull)), + ) +} + +// GetLastSuccessTime returns the last success time for a component. +func (hm *HealthMetrics) GetLastSuccessTime(component string) (time.Time, bool) { + hm.mu.RLock() + defer hm.mu.RUnlock() + t, ok := hm.lastSuccessTime[component] + return t, ok +} + +// GetHealthStatus returns the last known health status for a component. +func (hm *HealthMetrics) GetHealthStatus(component string) (bool, bool) { + hm.mu.RLock() + defer hm.mu.RUnlock() + status, ok := hm.lastHealthStatus[component] + return status, ok +} diff --git a/relayer/monitor/health_metrics_test.go b/relayer/monitor/health_metrics_test.go new file mode 100644 index 000000000..f5f1da73c --- /dev/null +++ b/relayer/monitor/health_metrics_test.go @@ -0,0 +1,270 @@ +package monitor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + aptosTypes "github.com/smartcontractkit/chainlink-aptos/relayer/types" +) + +func TestNewHealthMetrics(t *testing.T) { + t.Parallel() + + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: "test-chain-id", + NetworkName: "testnet", + NetworkNameFull: "Sui Testnet", + } + + hm, err := NewHealthMetrics(chainInfo) + require.NoError(t, err) + require.NotNil(t, hm) + + // Verify all component metrics are initialized + components := []string{ + ComponentSuiRelayer, + ComponentTxM, + ComponentEventsIndexer, + ComponentTransactionsIndexer, + ComponentChainReader, + ComponentChainWriter, + } + + for _, component := range components { + // Check that initial health status is set to false (unhealthy) + status, exists := hm.GetHealthStatus(component) + assert.True(t, exists, "Component %s should have initial status", component) + assert.False(t, status, "Initial status for %s should be unhealthy", component) + + // Check that initial last success time is set + lastSuccess, exists := hm.GetLastSuccessTime(component) + assert.True(t, exists, "Component %s should have initial last success time", component) + assert.False(t, lastSuccess.IsZero(), "Last success time for %s should not be zero", component) + } +} + +func TestRecordHealth(t *testing.T) { + t.Parallel() + + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: "test-chain-id", + NetworkName: "testnet", + NetworkNameFull: "Sui Testnet", + } + + hm, err := NewHealthMetrics(chainInfo) + require.NoError(t, err) + + ctx := context.Background() + component := ComponentTxM + + // Initial state should be unhealthy + status, exists := hm.GetHealthStatus(component) + assert.True(t, exists) + assert.False(t, status) + + // Record healthy status + hm.RecordHealth(ctx, component, true) + status, exists = hm.GetHealthStatus(component) + assert.True(t, exists) + assert.True(t, status) + + // Record unhealthy status (flip-flop) + hm.RecordHealth(ctx, component, false) + status, exists = hm.GetHealthStatus(component) + assert.True(t, exists) + assert.False(t, status) + + // Record healthy again (another flip-flop) + hm.RecordHealth(ctx, component, true) + status, exists = hm.GetHealthStatus(component) + assert.True(t, exists) + assert.True(t, status) +} + +func TestRecordLastSuccess(t *testing.T) { + t.Parallel() + + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: "test-chain-id", + NetworkName: "testnet", + NetworkNameFull: "Sui Testnet", + } + + hm, err := NewHealthMetrics(chainInfo) + require.NoError(t, err) + + ctx := context.Background() + component := ComponentEventsIndexer + + // Get initial last success time + initialTime, exists := hm.GetLastSuccessTime(component) + require.True(t, exists) + + // Wait a bit to ensure time difference + time.Sleep(10 * time.Millisecond) + + // Record a new success + hm.RecordLastSuccess(ctx, component) + + // Verify last success time was updated + newTime, exists := hm.GetLastSuccessTime(component) + require.True(t, exists) + assert.True(t, newTime.After(initialTime), "Last success time should be updated") +} + +func TestRecordHealthFromReport(t *testing.T) { + t.Parallel() + + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: "test-chain-id", + NetworkName: "testnet", + NetworkNameFull: "Sui Testnet", + } + + hm, err := NewHealthMetrics(chainInfo) + require.NoError(t, err) + + ctx := context.Background() + + // Simulate a health report with mixed statuses + report := map[string]error{ + ComponentSuiRelayer: nil, // healthy + ComponentTxM: nil, // healthy + ComponentEventsIndexer: assert.AnError, // unhealthy + ComponentChainReader: nil, // healthy + } + + hm.RecordHealthFromReport(ctx, report) + + // Verify statuses were recorded correctly + status, exists := hm.GetHealthStatus(ComponentSuiRelayer) + assert.True(t, exists) + assert.True(t, status) + + status, exists = hm.GetHealthStatus(ComponentTxM) + assert.True(t, exists) + assert.True(t, status) + + status, exists = hm.GetHealthStatus(ComponentEventsIndexer) + assert.True(t, exists) + assert.False(t, status) + + status, exists = hm.GetHealthStatus(ComponentChainReader) + assert.True(t, exists) + assert.True(t, status) +} + +func TestUpdateProcessingLag(t *testing.T) { + t.Parallel() + + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: "test-chain-id", + NetworkName: "testnet", + NetworkNameFull: "Sui Testnet", + } + + hm, err := NewHealthMetrics(chainInfo) + require.NoError(t, err) + + ctx := context.Background() + + // Record a success for a component + hm.RecordLastSuccess(ctx, ComponentTxM) + + // Wait a bit + time.Sleep(50 * time.Millisecond) + + // Update processing lag - this should not panic and should complete + hm.UpdateProcessingLag(ctx) + + // Verify the component still has a valid last success time + lastSuccess, exists := hm.GetLastSuccessTime(ComponentTxM) + assert.True(t, exists) + assert.False(t, lastSuccess.IsZero()) +} + +func TestConcurrentAccess(t *testing.T) { + t.Parallel() + + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: "test-chain-id", + NetworkName: "testnet", + NetworkNameFull: "Sui Testnet", + } + + hm, err := NewHealthMetrics(chainInfo) + require.NoError(t, err) + + ctx := context.Background() + done := make(chan struct{}) + + // Start multiple goroutines that access health metrics concurrently + for i := 0; i < 10; i++ { + go func() { + for j := 0; j < 100; j++ { + hm.RecordHealth(ctx, ComponentTxM, j%2 == 0) + hm.RecordLastSuccess(ctx, ComponentEventsIndexer) + hm.UpdateProcessingLag(ctx) + _, _ = hm.GetHealthStatus(ComponentTxM) + _, _ = hm.GetLastSuccessTime(ComponentEventsIndexer) + } + done <- struct{}{} + }() + } + + // Wait for all goroutines to complete + for i := 0; i < 10; i++ { + <-done + } + + // If we got here without panics or deadlocks, the test passes +} + +func TestFlipFlopDetection(t *testing.T) { + t.Parallel() + + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: "test-chain-id", + NetworkName: "testnet", + NetworkNameFull: "Sui Testnet", + } + + hm, err := NewHealthMetrics(chainInfo) + require.NoError(t, err) + + ctx := context.Background() + component := ComponentSuiRelayer + + // Initial state is unhealthy (false) + // Recording unhealthy again should NOT be a flip-flop + hm.RecordHealth(ctx, component, false) + status, _ := hm.GetHealthStatus(component) + assert.False(t, status) + + // Recording healthy IS a flip-flop + hm.RecordHealth(ctx, component, true) + status, _ = hm.GetHealthStatus(component) + assert.True(t, status) + + // Recording healthy again should NOT be a flip-flop + hm.RecordHealth(ctx, component, true) + status, _ = hm.GetHealthStatus(component) + assert.True(t, status) + + // Recording unhealthy IS a flip-flop + hm.RecordHealth(ctx, component, false) + status, _ = hm.GetHealthStatus(component) + assert.False(t, status) +} diff --git a/relayer/monitor/metrics.go b/relayer/monitor/metrics.go index 5d10c9f40..4a4172f6c 100644 --- a/relayer/monitor/metrics.go +++ b/relayer/monitor/metrics.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/smartcontractkit/chainlink-sui/relayer/config" + aptosTypes "github.com/smartcontractkit/chainlink-aptos/relayer/types" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -31,12 +31,12 @@ func NewGaugeAccBalance(unitStr string) (*GaugeAccBalance, error) { return &GaugeAccBalance{gauge}, nil } -func (g *GaugeAccBalance) Record(ctx context.Context, balance float64, account string, chainInfo config.ChainInfo) { +func (g *GaugeAccBalance) Record(ctx context.Context, balance float64, account string, chainInfo aptosTypes.ChainInfo) { oAttrs := metric.WithAttributeSet(g.GetAttributes(account, chainInfo)) g.gauge.Record(ctx, balance, oAttrs) } -func (g *GaugeAccBalance) GetAttributes(account string, chainInfo config.ChainInfo) attribute.Set { +func (g *GaugeAccBalance) GetAttributes(account string, chainInfo aptosTypes.ChainInfo) attribute.Set { return attribute.NewSet( attribute.String("account", account), diff --git a/relayer/plugin/relayer.go b/relayer/plugin/relayer.go index 0b84ec5cc..db6a93d82 100644 --- a/relayer/plugin/relayer.go +++ b/relayer/plugin/relayer.go @@ -22,6 +22,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" aptosBalanceMonitor "github.com/smartcontractkit/chainlink-aptos/relayer/monitor" + aptosTypes "github.com/smartcontractkit/chainlink-aptos/relayer/types" + chainreaderConfig "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/config" chainreader "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/reader" "github.com/smartcontractkit/chainlink-sui/relayer/chainwriter" @@ -30,6 +32,9 @@ import ( "github.com/smartcontractkit/chainlink-sui/relayer/txm" ) +// HealthMetricsPollInterval defines how often health metrics are published +const HealthMetricsPollInterval = 15 * time.Second + type SuiRelayer struct { types.UnimplementedRelayer services.StateMachine @@ -46,6 +51,11 @@ type SuiRelayer struct { balanceMonitor services.Service indexer *indexer.Indexer + + // Health metrics for monitoring + healthMetrics *monitor.HealthMetrics + healthStopChan chan struct{} + healthDone chan struct{} } var _ types.Relayer = &SuiRelayer{} @@ -169,7 +179,7 @@ func NewRelayer(cfg *config.TOMLConfig, lggr logger.Logger, keystore core.Keysto return nil, fmt.Errorf("error in NewRelayer (monitor) - invalid balance poll period: %w", err) } balanceMonitorService, err := monitor.NewBalanceMonitor(monitor.BalanceMonitorOpts{ - ChainInfo: aptosBalanceMonitor.ChainInfo{ + ChainInfo: aptosTypes.ChainInfo{ ChainFamilyName: "sui", ChainID: *cfg.ChainID, NetworkName: *cfg.NetworkName, @@ -188,6 +198,22 @@ func NewRelayer(cfg *config.TOMLConfig, lggr logger.Logger, keystore core.Keysto return nil, fmt.Errorf("error in NewRelayer (monitor) - failed to create new balance monitor: %w", err) } + // Initialize health metrics for monitoring + chainInfo := aptosTypes.ChainInfo{ + ChainFamilyName: "sui", + ChainID: *cfg.ChainID, + NetworkName: *cfg.NetworkName, + NetworkNameFull: *cfg.NetworkNameFull, + } + healthMetrics, err := monitor.NewHealthMetrics(chainInfo) + if err != nil { + return nil, fmt.Errorf("error in NewRelayer - failed to create health metrics: %w", err) + } + + // Set health metrics on components that support it + txManager.SetHealthMetrics(healthMetrics) + indexerInstance.SetHealthMetrics(healthMetrics) + return &SuiRelayer{ chainId: id, chainIdNum: idNum, @@ -198,6 +224,9 @@ func NewRelayer(cfg *config.TOMLConfig, lggr logger.Logger, keystore core.Keysto balanceMonitor: balanceMonitorService, db: db, indexer: indexerInstance, + healthMetrics: healthMetrics, + healthStopChan: make(chan struct{}), + healthDone: make(chan struct{}), }, nil } @@ -218,7 +247,14 @@ func (r *SuiRelayer) Start(ctx context.Context) error { r.lggr.Debug("Starting Sui Relayer") var ms services.MultiStart - return ms.Start(ctx, r.txm, r.indexer, r.balanceMonitor) + if err := ms.Start(ctx, r.txm, r.indexer, r.balanceMonitor); err != nil { + return err + } + + // Start health metrics publishing goroutine + go r.healthMetricsLoop() + + return nil }) } @@ -226,6 +262,10 @@ func (r *SuiRelayer) Close() error { return r.StopOnce("SuiRelayer", func() error { r.lggr.Debug("Stopping Sui Relayer") + // Stop health metrics goroutine + close(r.healthStopChan) + <-r.healthDone + return services.CloseAll(r.txm, r.indexer, r.balanceMonitor) }) } @@ -242,10 +282,52 @@ func (r *SuiRelayer) Ready() error { func (r *SuiRelayer) HealthReport() map[string]error { report := map[string]error{r.Name(): r.Healthy()} services.CopyHealth(report, r.txm.HealthReport()) + services.CopyHealth(report, r.indexer.HealthReport()) return report } +// healthMetricsLoop periodically publishes health metrics for all components. +func (r *SuiRelayer) healthMetricsLoop() { + defer close(r.healthDone) + + ticker := time.NewTicker(HealthMetricsPollInterval) + defer ticker.Stop() + + r.lggr.Info("Starting health metrics publishing loop") + + for { + select { + case <-r.healthStopChan: + r.lggr.Info("Health metrics loop stopped") + return + case <-ticker.C: + r.publishHealthMetrics() + } + } +} + +// publishHealthMetrics collects and publishes health metrics for all components. +func (r *SuiRelayer) publishHealthMetrics() { + ctx := context.Background() + + // Collect health reports from all components + report := r.HealthReport() + + // Record health status for each component + r.healthMetrics.RecordHealthFromReport(ctx, report) + + // Update processing lag for all components + r.healthMetrics.UpdateProcessingLag(ctx) + + r.lggr.Debugw("Published health metrics", "report", report) +} + +// GetHealthMetrics returns the health metrics instance for external access. +func (r *SuiRelayer) GetHealthMetrics() *monitor.HealthMetrics { + return r.healthMetrics +} + // ChainService interface func (r *SuiRelayer) GetChainStatus(ctx context.Context) (types.ChainStatus, error) { toml, err := r.cfg.TOMLString() diff --git a/relayer/txm/confirmer.go b/relayer/txm/confirmer.go index ff5c327c4..b3701121a 100644 --- a/relayer/txm/confirmer.go +++ b/relayer/txm/confirmer.go @@ -83,7 +83,7 @@ func checkConfirmations(loopCtx context.Context, txm *SuiTxm) { switch resp.Status { case success: - if err := handleSuccess(txm, tx); err != nil { + if err := handleSuccess(loopCtx, txm, tx); err != nil { txm.lggr.Errorw("Error handling successful transaction", "transactionID", tx.TransactionID, "error", err) } case failure: @@ -96,13 +96,16 @@ func checkConfirmations(loopCtx context.Context, txm *SuiTxm) { } } -func handleSuccess(txm *SuiTxm, tx SuiTx) error { +func handleSuccess(ctx context.Context, txm *SuiTxm, tx SuiTx) error { if err := txm.transactionRepository.ChangeState(tx.TransactionID, StateFinalized); err != nil { txm.lggr.Errorw("Failed to update transaction state", "transactionID", tx.TransactionID, "error", err) return err } txm.lggr.Infow("Transaction finalized", "transactionID", tx.TransactionID) + // Record successful transaction in health metrics + txm.recordLastSuccess(ctx) + if err := txm.coinManager.ReleaseCoins(tx.TransactionID); err != nil { // This error is not critical, can be safely ignored as the coins will auto-release after the default TTL txm.lggr.Debugw("Failed to release coins", "transactionID", tx.TransactionID, "error", err) diff --git a/relayer/txm/txm.go b/relayer/txm/txm.go index 28b9caaf1..1535c80f5 100644 --- a/relayer/txm/txm.go +++ b/relayer/txm/txm.go @@ -14,6 +14,7 @@ import ( "github.com/block-vision/sui-go-sdk/transaction" "github.com/smartcontractkit/chainlink-sui/relayer/client" + "github.com/smartcontractkit/chainlink-sui/relayer/monitor" ) const numberGoroutines = 3 @@ -39,6 +40,9 @@ type SuiTxm struct { done sync.WaitGroup broadcastChannel chan string stopChannel chan struct{} + + // Health metrics for monitoring (optional) + healthMetrics *monitor.HealthMetrics } func NewSuiTxm( @@ -193,4 +197,17 @@ func (txm *SuiTxm) GetGasManager() GasManager { return txm.gasManager } +// SetHealthMetrics sets the health metrics instance for the transaction manager. +// This should be called after creating the TxM to enable health metrics reporting. +func (txm *SuiTxm) SetHealthMetrics(hm *monitor.HealthMetrics) { + txm.healthMetrics = hm +} + +// recordLastSuccess records a successful operation to the health metrics. +func (txm *SuiTxm) recordLastSuccess(ctx context.Context) { + if txm.healthMetrics != nil { + txm.healthMetrics.RecordLastSuccess(ctx, monitor.ComponentTxM) + } +} + var _ TxManager = (*SuiTxm)(nil)