Skip to content
9 changes: 9 additions & 0 deletions apps/evm/server/force_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/evstack/ev-node/pkg/config"
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
da "github.com/evstack/ev-node/pkg/da/types"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -73,6 +74,14 @@ func (m *mockDA) HasForcedInclusionNamespace() bool {
return true
}

func (m *mockDA) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
return nil, nil
}

func (m *mockDA) LocalHead(ctx context.Context) (uint64, error) {
return 0, nil
}

func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) {
testHeight := uint64(100)

Expand Down
32 changes: 32 additions & 0 deletions block/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type Metrics struct {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period
ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious

// Sync mode metrics
SyncMode metrics.Gauge // Current sync mode: 0=catchup, 1=follow
SubscribeErrors metrics.Counter // Number of subscription failures
ModeSwitches metrics.Counter // Number of catchup<->follow mode transitions
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand Down Expand Up @@ -201,6 +206,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)",
}, labels).With(labelsAndValues...)

// Sync mode metrics
m.SyncMode = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "sync_mode",
Help: "Current sync mode: 0=catchup (polling), 1=follow (subscription)",
}, labels).With(labelsAndValues...)

m.SubscribeErrors = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "subscribe_errors_total",
Help: "Total number of DA subscription failures",
}, labels).With(labelsAndValues...)

m.ModeSwitches = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "mode_switches_total",
Help: "Total number of sync mode transitions between catchup and follow",
}, labels).With(labelsAndValues...)

// DA Submitter metrics
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -269,6 +296,11 @@ func NopMetrics() *Metrics {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod: discard.NewGauge(),
ForcedInclusionTxsMalicious: discard.NewCounter(),

// Sync mode metrics
SyncMode: discard.NewGauge(),
SubscribeErrors: discard.NewCounter(),
ModeSwitches: discard.NewCounter(),
}

// Initialize maps with no-op metrics
Expand Down
24 changes: 24 additions & 0 deletions block/internal/common/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"

// BlobsFromSubscription returns non-empty blob data from a subscription response.
func BlobsFromSubscription(resp *blobrpc.SubscriptionResponse) [][]byte {
if resp == nil || len(resp.Blobs) == 0 {
return nil
}

blobs := make([][]byte, 0, len(resp.Blobs))
for _, blob := range resp.Blobs {
if blob == nil {
continue
}
data := blob.Data()
if len(data) == 0 {
continue
}
blobs = append(blobs, data)
}

return blobs
}
63 changes: 63 additions & 0 deletions block/internal/da/async_block_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type AsyncBlockRetriever interface {
Stop()
GetCachedBlock(ctx context.Context, daHeight uint64) (*BlockData, error)
UpdateCurrentHeight(height uint64)
StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time)
}

// BlockData contains data retrieved from a single DA height
Expand Down Expand Up @@ -125,6 +126,68 @@ func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) {
}
}

// StoreBlock caches a block's blobs, favoring existing data to avoid churn.
func (f *asyncBlockRetriever) StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time) {
if len(f.namespace) == 0 {
return
}
if height < f.daStartHeight {
return
}
if len(blobs) == 0 {
return
}

filtered := make([][]byte, 0, len(blobs))
for _, blob := range blobs {
if len(blob) > 0 {
filtered = append(filtered, blob)
}
}
if len(filtered) == 0 {
return
}

if timestamp.IsZero() {
timestamp = time.Now().UTC()
}

key := newBlockDataKey(height)
if existing, err := f.cache.Get(ctx, key); err == nil {
var pbBlock pb.BlockData
if err := proto.Unmarshal(existing, &pbBlock); err == nil && len(pbBlock.Blobs) > 0 {
return
}
}

pbBlock := &pb.BlockData{
Height: height,
Timestamp: timestamp.Unix(),
Blobs: filtered,
}
data, err := proto.Marshal(pbBlock)
if err != nil {
f.logger.Error().
Err(err).
Uint64("height", height).
Msg("failed to marshal block for caching")
return
}

if err := f.cache.Put(ctx, key, data); err != nil {
f.logger.Error().
Err(err).
Uint64("height", height).
Msg("failed to cache block")
return
}

f.logger.Debug().
Uint64("height", height).
Int("blob_count", len(filtered)).
Msg("cached block from subscription")
}

func newBlockDataKey(height uint64) ds.Key {
return ds.NewKey(fmt.Sprintf("/block/%d", height))
}
Expand Down
24 changes: 24 additions & 0 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,27 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype

return results, nil
}

// Subscribe subscribes to blobs in the specified namespace.
// Returns a channel that receives subscription responses as new blobs are included.
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

return c.blobAPI.Subscribe(ctx, ns)
}

// LocalHead returns the height of the locally synced DA head.
func (c *client) LocalHead(ctx context.Context) (uint64, error) {
headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()

header, err := c.headerAPI.LocalHead(headCtx)
if err != nil {
return 0, fmt.Errorf("failed to get local head: %w", err)
}

return header.Height, nil
}
Loading