From aec900a64ed57b8040116fec9edf94e0da546463 Mon Sep 17 00:00:00 2001 From: syntrust Date: Mon, 26 Jan 2026 16:09:37 +0800 Subject: [PATCH 1/4] refactor --- cmd/es-node/config.go | 7 ++++++- ethstorage/node/node.go | 1 + ethstorage/scanner/config.go | 10 +++++----- ethstorage/scanner/scanner.go | 24 ++++++++++++++++++------ ethstorage/scanner/utils.go | 2 +- ethstorage/scanner/worker.go | 24 ++++++++++++------------ 6 files changed, 43 insertions(+), 25 deletions(-) diff --git a/cmd/es-node/config.go b/cmd/es-node/config.go index 3ca16e49..ce6f44de 100644 --- a/cmd/es-node/config.go +++ b/cmd/es-node/config.go @@ -85,6 +85,11 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) { // } // l2SyncEndpoint := NewL2SyncEndpointConfig(ctx) + + scannerConfig, err := scanner.NewConfig(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create scanner config: %w", err) + } cfg := &node.Config{ L1: *l1Endpoint, ChainID: chainId, @@ -122,7 +127,7 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) { Storage: *storageConfig, Mining: minerConfig, Archiver: archiverConfig, - Scanner: scanner.NewConfig(ctx), + Scanner: scannerConfig, } if err := cfg.Check(); err != nil { return nil, err diff --git a/ethstorage/node/node.go b/ethstorage/node/node.go index e7aa2dd3..5b7a8a76 100644 --- a/ethstorage/node/node.go +++ b/ethstorage/node/node.go @@ -341,6 +341,7 @@ func (n *EsNode) initScanner(ctx context.Context, cfg *Config) { n.feed, n.lg, ) + n.scanner.Start() } func (n *EsNode) Start(ctx context.Context, cfg *Config) error { diff --git a/ethstorage/scanner/config.go b/ethstorage/scanner/config.go index 14057afb..a27483e6 100644 --- a/ethstorage/scanner/config.go +++ b/ethstorage/scanner/config.go @@ -58,20 +58,20 @@ func CLIFlags() []cli.Flag { return flags } -func NewConfig(ctx *cli.Context) *Config { +func NewConfig(ctx *cli.Context) (*Config, error) { mode := ctx.GlobalInt(ModeFlagName) if mode == modeDisabled { - return nil + return nil, nil } if mode != modeCheckMeta && mode != modeCheckBlob { - panic(fmt.Sprintf("invalid scanner mode: %d", mode)) + return nil, fmt.Errorf("invalid scanner mode: %d", mode) } if interval := ctx.GlobalInt(IntervalFlagName); interval < defaultInterval { - panic(fmt.Sprintf("scanner interval must be at least %d minutes", defaultInterval)) + return nil, fmt.Errorf("scanner interval must be at least %d minutes", defaultInterval) } return &Config{ Mode: mode, BatchSize: ctx.GlobalInt(BatchSizeFlagName), Interval: ctx.GlobalInt(IntervalFlagName), - } + }, nil } diff --git a/ethstorage/scanner/scanner.go b/ethstorage/scanner/scanner.go index de3b7142..bbf02125 100644 --- a/ethstorage/scanner/scanner.go +++ b/ethstorage/scanner/scanner.go @@ -37,7 +37,7 @@ func New( lg log.Logger, ) *Scanner { cctx, cancel := context.WithCancel(ctx) - scanner := &Scanner{ + return &Scanner{ worker: NewWorker(sm, fetchBlob, l1, cfg, lg), feed: feed, interval: time.Minute * time.Duration(cfg.Interval), @@ -46,9 +46,17 @@ func New( lg: lg, scanStats: ScanStats{0, 0}, } - scanner.wg.Add(1) - go scanner.update() - return scanner +} + +// Start begins the scanner's background processing. Must be called after New(). +func (s *Scanner) Start() { + s.mu.Lock() + defer s.mu.Unlock() + if s.running { + return + } + s.wg.Add(1) + go s.update() } func (s *Scanner) update() { @@ -141,6 +149,9 @@ func (s *Scanner) logStats(sts *stats) { } func (s *Scanner) GetScanState() *ScanStats { + if s == nil { + return &ScanStats{} + } s.mu.Lock() defer s.mu.Unlock() snapshot := s.scanStats // Make a copy @@ -161,11 +172,12 @@ func (s *Scanner) Close() { return } s.running = false + cancel := s.cancel s.mu.Unlock() - s.cancel() - s.lg.Info("Scanner closed") + cancel() s.wg.Wait() + s.lg.Info("Scanner closed") } func (s *Scanner) doWork(tracker mismatchTracker) (*stats, scanErrors, error) { diff --git a/ethstorage/scanner/utils.go b/ethstorage/scanner/utils.go index ae6f6d40..44a1bd46 100644 --- a/ethstorage/scanner/utils.go +++ b/ethstorage/scanner/utils.go @@ -16,7 +16,7 @@ func (s scanErrors) add(kvIndex uint64, err error) { s[kvIndex] = err } -func (s scanErrors) nil(kvIndex uint64) { +func (s scanErrors) clearError(kvIndex uint64) { s[kvIndex] = nil } diff --git a/ethstorage/scanner/worker.go b/ethstorage/scanner/worker.go index d754f491..c9478a34 100644 --- a/ethstorage/scanner/worker.go +++ b/ethstorage/scanner/worker.go @@ -53,7 +53,7 @@ func NewWorker( func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*stats, scanErrors, error) { // Never return nil stats and nil scanErrors sts := newStats() - scanErrors := make(scanErrors) + errs := make(scanErrors) // Query local storage info shards := s.sm.Shards() @@ -61,7 +61,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st entryCount := s.sm.KvEntryCount() if entryCount == 0 { s.lg.Info("Scanner: no KV entries found in local storage") - return sts, scanErrors, nil + return sts, errs, nil } lastKvIdx := entryCount - 1 s.lg.Info("Scanner: local storage info", "lastKvIdx", lastKvIdx, "shards", shards, "kvEntriesPerShard", kvEntries) @@ -76,7 +76,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st metas, err := s.l1.GetKvMetas(kvsInBatch, rpc.FinalizedBlockNumber.Int64()) if err != nil { s.lg.Error("Scanner: failed to query KV metas", "error", err) - return sts, scanErrors, fmt.Errorf("failed to query KV metas: %w", err) + return sts, errs, fmt.Errorf("failed to query KV metas: %w", err) } s.lg.Debug("Scanner: query KV meta done", "kvsInBatch", shortPrt(kvsInBatch)) @@ -84,7 +84,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st select { case <-ctx.Done(): s.lg.Warn("Scanner canceled, stopping scan", "ctx.Err", ctx.Err()) - return sts, scanErrors, ctx.Err() + return sts, errs, ctx.Err() default: } @@ -98,7 +98,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st metaLocal, found, err = s.sm.TryReadMeta(kvIndex) if err != nil { s.lg.Error("Scanner: failed to read meta", "kvIndex", kvIndex, "error", err) - scanErrors.add(kvIndex, fmt.Errorf("failed to read meta: %w", err)) + errs.add(kvIndex, fmt.Errorf("failed to read meta: %w", err)) continue } err = es.CompareCommits(commit.Bytes(), metaLocal) @@ -107,7 +107,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st _, found, err = s.sm.TryRead(kvIndex, int(s.sm.MaxKvSize()), commit) } else { s.lg.Error("Scanner: invalid scanner mode", "mode", s.cfg.Mode) - return sts, scanErrors, fmt.Errorf("invalid scanner mode: %d", s.cfg.Mode) + return sts, errs, fmt.Errorf("invalid scanner mode: %d", s.cfg.Mode) } if found && err == nil { @@ -118,7 +118,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st case failed: mismatched.markRecovered(kvIndex) // Clear the error state - scanErrors.nil(kvIndex) + errs.clearError(kvIndex) s.lg.Info("Scanner: previously failed KV recovered", "kvIndex", kvIndex) case pending: delete(mismatched, kvIndex) @@ -133,7 +133,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st if !found { // The shard is not stored locally - scanErrors.add(kvIndex, fmt.Errorf("shard not found locally: commit=%x", commit)) + errs.add(kvIndex, fmt.Errorf("shard not found locally: commit=%x", commit)) s.lg.Error("Scanner: blob not found locally", "kvIndex", kvIndex, "commit", commit) continue } @@ -149,11 +149,11 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st if fixErr := s.fixKv(kvIndex, commit); fixErr != nil { mismatched.markFailed(kvIndex) s.lg.Error("Scanner: failed to fix blob", "kvIndex", kvIndex, "error", fixErr) - scanErrors.add(kvIndex, fmt.Errorf("failed to fix blob: %w", fixErr)) + errs.add(kvIndex, fmt.Errorf("failed to fix blob: %w", fixErr)) } else { s.lg.Info("Scanner: blob fixed successfully", "kvIndex", kvIndex) mismatched.markFixed(kvIndex) - scanErrors.nil(kvIndex) + errs.clearError(kvIndex) } } else { @@ -163,7 +163,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st } } else { s.lg.Error("Scanner: unexpected error occurred", "kvIndex", kvIndex, "error", err) - scanErrors.add(kvIndex, fmt.Errorf("unexpected error: %w", err)) + errs.add(kvIndex, fmt.Errorf("unexpected error: %w", err)) } } } @@ -175,7 +175,7 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st sts.mismatched = mismatched - return sts, scanErrors, nil + return sts, errs, nil } func (s *Worker) fixKv(kvIndex uint64, commit common.Hash) error { From 5ae205acf19efe774ecc16102ac4a28254430a9b Mon Sep 17 00:00:00 2001 From: syntrust Date: Tue, 27 Jan 2026 11:55:53 +0800 Subject: [PATCH 2/4] use existed blob method --- cmd/es-node/utils.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cmd/es-node/utils.go b/cmd/es-node/utils.go index 443fcd88..e89b96a2 100644 --- a/cmd/es-node/utils.go +++ b/cmd/es-node/utils.go @@ -6,7 +6,6 @@ package main import ( "bytes" "context" - "crypto/sha256" "fmt" "math/big" "net/http" @@ -19,11 +18,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" es "github.com/ethstorage/go-ethstorage/ethstorage" + "github.com/ethstorage/go-ethstorage/ethstorage/blobs" "github.com/ethstorage/go-ethstorage/ethstorage/flags" "github.com/ethstorage/go-ethstorage/ethstorage/storage" "github.com/urfave/cli" @@ -261,15 +260,11 @@ func downloadBlobFromRPC(endpoint string, kvIndex uint64, hash common.Hash) ([]b return nil, err } - var blob kzg4844.Blob - copy(blob[:], result) - commitment, err := kzg4844.BlobToCommitment(&blob) + blobhash, err := blobs.BlobToVersionedHash(result) if err != nil { return nil, fmt.Errorf("blobToCommitment failed: %w", err) } - blobhash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &commitment)) - fmt.Printf("blobhash from blob: %x\n", blobhash) - if bytes.Compare(blobhash[:es.HashSizeInContract], hash[:es.HashSizeInContract]) != 0 { + if !bytes.Equal(blobhash[:es.HashSizeInContract], hash[:es.HashSizeInContract]) { return nil, fmt.Errorf("invalid blobhash for %d want: %x, got: %x", kvIndex, hash, blobhash) } From 782b060fb849dc42aab556a62e93a9008b0daf62 Mon Sep 17 00:00:00 2001 From: syntrust Date: Tue, 27 Jan 2026 15:24:15 +0800 Subject: [PATCH 3/4] refactors --- ethstorage/scanner/scanner.go | 7 +-- ethstorage/scanner/utils.go | 19 -------- ethstorage/scanner/worker.go | 82 +++++++++++++++++++++++++---------- 3 files changed, 64 insertions(+), 44 deletions(-) diff --git a/ethstorage/scanner/scanner.go b/ethstorage/scanner/scanner.go index bbf02125..147bc764 100644 --- a/ethstorage/scanner/scanner.go +++ b/ethstorage/scanner/scanner.go @@ -22,9 +22,10 @@ type Scanner struct { cancel context.CancelFunc wg sync.WaitGroup running bool - mu sync.Mutex + mu sync.Mutex // protects running lg log.Logger scanStats ScanStats + statsMu sync.Mutex // protects scanStats } func New( @@ -152,8 +153,8 @@ func (s *Scanner) GetScanState() *ScanStats { if s == nil { return &ScanStats{} } - s.mu.Lock() - defer s.mu.Unlock() + s.statsMu.Lock() + defer s.statsMu.Unlock() snapshot := s.scanStats // Make a copy return &snapshot // Return a pointer to the copy } diff --git a/ethstorage/scanner/utils.go b/ethstorage/scanner/utils.go index 44a1bd46..dc63c8b8 100644 --- a/ethstorage/scanner/utils.go +++ b/ethstorage/scanner/utils.go @@ -162,25 +162,6 @@ func shortPrt(nums []uint64) string { return strings.Join(res, ",") } -func summaryLocalKvs(shards []uint64, kvEntries, lastKvIdx uint64) string { - var res []string - for _, shard := range shards { - if shard*kvEntries > lastKvIdx { - // skip empty shards - break - } - var lastEntry uint64 - if shard == lastKvIdx/kvEntries { - lastEntry = lastKvIdx - } else { - lastEntry = (shard+1)*kvEntries - 1 - } - shardView := fmt.Sprintf("shard%d%s", shard, formatRange(shard*kvEntries, lastEntry)) - res = append(res, shardView) - } - return strings.Join(res, ",") -} - func formatRange(start, end uint64) string { if start == end { return fmt.Sprintf("[%d]", start) diff --git a/ethstorage/scanner/worker.go b/ethstorage/scanner/worker.go index c9478a34..0a646f76 100644 --- a/ethstorage/scanner/worker.go +++ b/ethstorage/scanner/worker.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "strings" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -67,10 +68,16 @@ func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*st s.lg.Info("Scanner: local storage info", "lastKvIdx", lastKvIdx, "shards", shards, "kvEntriesPerShard", kvEntries) // Determine the batch of KV indices to scan - kvsInBatch, totalEntries, batchEndExclusive := getKvsInBatch(shards, kvEntries, lastKvIdx, uint64(s.cfg.BatchSize), s.nextIndexOfKvIdx, s.lg) + kvsInBatch, batchEndExclusive := s.getKvsInBatch(uint64(s.cfg.BatchSize), s.nextIndexOfKvIdx) - sts.localKvs = summaryLocalKvs(shards, kvEntries, lastKvIdx) - sts.total = int(totalEntries) + total, localKvs := s.summaryLocalKvs() + sts.total = int(total) + sts.localKvs = localKvs + + if len(kvsInBatch) == 0 { + s.lg.Info("Scanner: no KV entries to scan in this batch") + return sts, errs, nil + } // Query the metas from the L1 contract metas, err := s.l1.GetKvMetas(kvsInBatch, rpc.FinalizedBlockNumber.Int64()) @@ -185,28 +192,23 @@ func (s *Worker) fixKv(kvIndex uint64, commit common.Hash) error { return nil } -func getKvsInBatch(shards []uint64, kvEntries, lastKvIdx, batchSize, batchStartIndex uint64, lg log.Logger) ([]uint64, uint64, uint64) { - // Calculate the total number of KV entries stored locally - var totalEntries uint64 - // Shard indices are sorted but may not be continuous: e.g. [0, 1, 3, 4] indicates shard 2 is missing - for _, shardIndex := range shards { - // The last shard may contain fewer than the full kvEntries - if shardIndex == lastKvIdx/kvEntries { - totalEntries += lastKvIdx%kvEntries + 1 - break - } - // Complete shards - totalEntries += kvEntries +func (s *Worker) getKvsInBatch(batchSize uint64, startIndexOfKvIdx uint64) ([]uint64, uint64) { + localKvCount, _ := s.summaryLocalKvs() + if localKvCount == 0 { + return []uint64{}, 0 } - lg.Debug("Scanner: KV entries stored locally", "totalKvStored", totalEntries) + shards := s.sm.Shards() + kvEntries := s.sm.KvEntries() + return getKvsInBatch(shards, kvEntries, localKvCount, batchSize, startIndexOfKvIdx, s.lg) +} +func getKvsInBatch(shards []uint64, kvEntries, localKvCount, batchSize, startKvIndex uint64, lg log.Logger) ([]uint64, uint64) { // Determine batch start and end KV indices - startKvIndex := batchStartIndex - if startKvIndex >= totalEntries { + if startKvIndex >= localKvCount { startKvIndex = 0 - lg.Debug("Scanner: restarting scan from the beginning") + lg.Debug("Restarting scan from the beginning") } - endKvIndexExclusive := min(startKvIndex+batchSize, totalEntries) + endKvIndexExclusive := min(startKvIndex+batchSize, localKvCount) // The actual batch range is [startKvIndex, endKvIndexExclusive) or [startKvIndex, endIndex] endIndex := endKvIndexExclusive - 1 @@ -235,6 +237,42 @@ func getKvsInBatch(shards []uint64, kvEntries, lastKvIdx, batchSize, batchStartI kvsInBatch = append(kvsInBatch, shards[i]*kvEntries+k) } } - lg.Debug("Scanner: batch index range determined", "batchStart", startKvIndex, "batchEnd(exclusive)", endKvIndexExclusive, "kvsInBatch", shortPrt(kvsInBatch)) - return kvsInBatch, totalEntries, endKvIndexExclusive + lg.Debug("Scan batch index range determined", "batchStart", startKvIndex, "batchEnd(exclusive)", endKvIndexExclusive, "kvsInBatch", shortPrt(kvsInBatch)) + return kvsInBatch, endKvIndexExclusive +} + +func (s *Worker) summaryLocalKvs() (uint64, string) { + kvEntryCountOnChain := s.sm.KvEntryCount() + if kvEntryCountOnChain == 0 { + s.lg.Info("No KV entries found in local storage") + return 0, "[]" + } + return summaryLocalKvs(s.sm.Shards(), s.sm.KvEntries(), kvEntryCountOnChain-1) +} + +// Calculate the total number of KV entries stored locally +func summaryLocalKvs(shards []uint64, kvEntries, lastKvIdx uint64) (uint64, string) { + var totalEntries uint64 + var res []string + // Shard indices are sorted but may not be continuous: e.g. [0, 1, 3, 4] indicates shard 2 is missing + for _, shard := range shards { + shardOfLastKv := lastKvIdx / kvEntries + if shard > shardOfLastKv { + // Skip empty shards + break + } + var lastEntry uint64 + // The last shard may contain fewer than the full kvEntries + if shard == shardOfLastKv { + totalEntries += lastKvIdx%kvEntries + 1 + lastEntry = lastKvIdx + } else { + // Complete shards + totalEntries += kvEntries + lastEntry = (shard+1)*kvEntries - 1 + } + shardView := fmt.Sprintf("shard%d%s", shard, formatRange(shard*kvEntries, lastEntry)) + res = append(res, shardView) + } + return totalEntries, strings.Join(res, ",") } From 948e8b373e0882488209e152721c4b8b4a7b0a98 Mon Sep 17 00:00:00 2001 From: syntrust Date: Tue, 27 Jan 2026 16:16:27 +0800 Subject: [PATCH 4/4] tests --- ethstorage/scanner/worker_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ethstorage/scanner/worker_test.go b/ethstorage/scanner/worker_test.go index e8fe6c66..b6191e84 100644 --- a/ethstorage/scanner/worker_test.go +++ b/ethstorage/scanner/worker_test.go @@ -151,6 +151,17 @@ func TestGetKvsInBatch(t *testing.T) { expectedTotal: 14, expectedBatchEnd: 14, }, + { + name: "Discontinuous shards missing current", + shards: []uint64{0, 2}, + kvEntries: 8, + lastKvIdx: 12, + batchSize: 100, + batchStartIndex: 0, + expectedKvs: []uint64{0, 1, 2, 3, 4, 5, 6, 7}, + expectedTotal: 8, + expectedBatchEnd: 8, + }, { name: "Boundary conditions 1 kv", shards: []uint64{0}, @@ -224,7 +235,8 @@ func TestGetKvsInBatch(t *testing.T) { t.Run(tt.name, func(t *testing.T) { lg := log.New() - kvs, total, batchEnd := getKvsInBatch(tt.shards, tt.kvEntries, tt.lastKvIdx, tt.batchSize, tt.batchStartIndex, lg) + total, _ := summaryLocalKvs(tt.shards, tt.kvEntries, tt.lastKvIdx) + kvs, batchEnd := getKvsInBatch(tt.shards, tt.kvEntries, total, tt.batchSize, tt.batchStartIndex, lg) assert.Equal(t, tt.expectedKvs, kvs, "KV indices do not match") assert.Equal(t, tt.expectedTotal, total, "Total entries do not match")