From 5adb9b28e0f409a407cd3640996b6746ade74069 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Fri, 29 May 2026 08:15:34 -0600 Subject: [PATCH] chore: revert #5343 --- pkg/node/snapshot.go | 4 -- pkg/node/snapshot_test.go | 82 ------------------------ pkg/postage/batchservice/batchservice.go | 42 +++--------- pkg/postage/listener/listener.go | 14 +--- 4 files changed, 12 insertions(+), 130 deletions(-) diff --git a/pkg/node/snapshot.go b/pkg/node/snapshot.go index b235b31e11f..f0e3d2fdf23 100644 --- a/pkg/node/snapshot.go +++ b/pkg/node/snapshot.go @@ -49,10 +49,6 @@ func NewSnapshotLogFilterer(logger log.Logger, getter SnapshotGetter) *SnapshotL } } -func (f *SnapshotLogFilterer) GetBatchSnapshot() []byte { - return f.getter.GetBatchSnapshot() -} - // loadSnapshot is responsible for loading and processing the snapshot data. // It is intended to be called exactly once by initOnce.Do. func (f *SnapshotLogFilterer) loadSnapshot() error { diff --git a/pkg/node/snapshot_test.go b/pkg/node/snapshot_test.go index abdb7b9e4e0..d52382cc748 100644 --- a/pkg/node/snapshot_test.go +++ b/pkg/node/snapshot_test.go @@ -20,8 +20,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage/listener" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - archive "github.com/ethersphere/batch-archive" ) type mockSnapshotGetter struct { @@ -36,12 +34,6 @@ func (m mockSnapshotGetter) GetBatchSnapshot() []byte { return m.data } -type realSnapshotGetter struct{} - -func (r realSnapshotGetter) GetBatchSnapshot() []byte { - return archive.GetBatchSnapshot() -} - func makeSnapshotData(logs []types.Log) []byte { var buf bytes.Buffer gz := gzip.NewWriter(&buf) @@ -158,77 +150,3 @@ func TestNewSnapshotLogFilterer(t *testing.T) { assert.Equal(t, 0, res[3].Topics[0].Cmp(common.HexToHash("0xa4"))) }) } - -func TestSnapshotLogFilterer_RealSnapshot(t *testing.T) { - t.Parallel() - - getter := realSnapshotGetter{} - filterer := node.NewSnapshotLogFilterer(log.Noop, getter) - - t.Run("block number", func(t *testing.T) { - blockNumber, err := filterer.BlockNumber(context.Background()) - assert.NoError(t, err) - assert.Greater(t, blockNumber, uint64(0)) - }) - - t.Run("filter range", func(t *testing.T) { - // arbitrary range that should exist in the snapshot - from := big.NewInt(20000000) - to := big.NewInt(20001000) - res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{ - FromBlock: from, - ToBlock: to, - }) - require.NoError(t, err) - for _, l := range res { - assert.GreaterOrEqual(t, l.BlockNumber, from.Uint64()) - assert.LessOrEqual(t, l.BlockNumber, to.Uint64()) - } - }) - - t.Run("filter address mismatch", func(t *testing.T) { - // random address that should not match the postage stamp contract - addr := common.HexToAddress("0x1234567890123456789012345678901234567890") - res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{ - Addresses: []common.Address{addr}, - }) - require.NoError(t, err) - assert.Empty(t, res) - }) -} - -func BenchmarkNewSnapshotLogFilterer_Load(b *testing.B) { - getter := realSnapshotGetter{} - - for b.Loop() { - filterer := node.NewSnapshotLogFilterer(log.Noop, getter) - _, err := filterer.BlockNumber(context.Background()) - if err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkSnapshotLogFilterer(b *testing.B) { - getter := realSnapshotGetter{} - filterer := node.NewSnapshotLogFilterer(log.Noop, getter) - // ensure loaded - if _, err := filterer.BlockNumber(context.Background()); err != nil { - b.Fatal(err) - } - - b.Run("FilterLogs", func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - from := big.NewInt(20000000) - to := big.NewInt(20001000) - _, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{ - FromBlock: from, - ToBlock: to, - }) - if err != nil { - b.Fatal(err) - } - } - }) -} diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 1726b1c88dd..eb99d458421 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -40,8 +40,6 @@ type batchService struct { checksum hash.Hash // checksum hasher resync bool - - pendingChainState *postage.ChainState } type Interface interface { @@ -97,22 +95,15 @@ func New( } } - return &batchService{stateStore: stateStore, storer: storer, logger: logger.WithName(loggerName).Register(), listener: listener, owner: owner, batchListener: batchListener, checksum: sum, resync: resync}, nil -} - -func (svc *batchService) getChainState() *postage.ChainState { - if svc.pendingChainState != nil { - return svc.pendingChainState - } - return svc.storer.GetChainState() + return &batchService{stateStore, storer, logger.WithName(loggerName).Register(), listener, owner, batchListener, sum, resync}, nil } // Create will create a new batch with the given ID, owner value and depth and // stores it in the BatchedStore. func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error { - // dont add batches which have value which equals total cumulative + // don't add batches which have value which equals total cumulative // payout or that are going to expire already within the next couple of blocks - val := big.NewInt(0).Add(svc.getChainState().TotalAmount, svc.getChainState().CurrentPrice) + val := big.NewInt(0).Add(svc.storer.GetChainState().TotalAmount, svc.storer.GetChainState().CurrentPrice) if normalisedBalance.Cmp(val) <= 0 { // don't do anything return fmt.Errorf("batch service: batch %x: %w", id, ErrZeroValueBatch) @@ -121,7 +112,7 @@ func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance ID: id, Owner: owner, Value: normalisedBalance, - Start: svc.getChainState().Block, + Start: svc.storer.GetChainState().Block, Depth: depth, BucketDepth: bucketDepth, Immutable: immutable, @@ -205,13 +196,10 @@ func (svc *batchService) UpdateDepth(id []byte, depth uint8, normalisedBalance * // UpdatePrice implements the EventUpdater interface. It sets the current // price from the chain in the service chain state. func (svc *batchService) UpdatePrice(price *big.Int, txHash common.Hash) error { - cs := svc.getChainState() + cs := svc.storer.GetChainState() cs.CurrentPrice = price - - if svc.pendingChainState == nil { - if err := svc.storer.PutChainState(cs); err != nil { - return fmt.Errorf("put chain state: %w", err) - } + if err := svc.storer.PutChainState(cs); err != nil { + return fmt.Errorf("put chain state: %w", err) } sum, err := svc.updateChecksum(txHash) @@ -224,7 +212,7 @@ func (svc *batchService) UpdatePrice(price *big.Int, txHash common.Hash) error { } func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error { - cs := svc.getChainState() + cs := svc.storer.GetChainState() if blockNumber == cs.Block { return nil } @@ -235,11 +223,8 @@ func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error { cs.TotalAmount.Add(cs.TotalAmount, diff.Mul(diff, cs.CurrentPrice)) cs.Block = blockNumber - - if svc.pendingChainState == nil { - if err := svc.storer.PutChainState(cs); err != nil { - return fmt.Errorf("put chain state: %w", err) - } + if err := svc.storer.PutChainState(cs); err != nil { + return fmt.Errorf("put chain state: %w", err) } svc.logger.Debug("block height updated", "new_block", blockNumber) @@ -247,17 +232,10 @@ func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error { } func (svc *batchService) TransactionStart() error { - svc.pendingChainState = svc.storer.GetChainState() return svc.stateStore.Put(dirtyDBKey, true) } func (svc *batchService) TransactionEnd() error { - if svc.pendingChainState != nil { - if err := svc.storer.PutChainState(svc.pendingChainState); err != nil { - return fmt.Errorf("put chain state: %w", err) - } - svc.pendingChainState = nil - } return svc.stateStore.Delete(dirtyDBKey) } diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index 349b6a05a14..42949ca2ec4 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -29,7 +29,6 @@ const loggerName = "listener" const ( blockPage = 5000 // how many blocks to sync every time we page - blockPageSnapshot = 50000 // how many blocks to sync every time from snapshot tailSize = 4 // how many blocks to tail from the tip of the chain defaultBatchFactor = uint64(5) // minimal number of blocks to sync at once ) @@ -233,15 +232,6 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even l.logger.Debug("batch factor", "value", batchFactor) - // Type assertion to detect if backend is SnapshotLogFilterer - pageSize := uint64(blockPage) - if _, isSnapshot := l.ev.(interface{ GetBatchSnapshot() []byte }); isSnapshot { - pageSize = blockPageSnapshot - l.logger.Debug("using snapshot page size", "page_size", pageSize) - } else { - l.logger.Debug("using standard page size", "page_size", pageSize) - } - synced := make(chan error) closeOnce := new(sync.Once) paged := true @@ -322,9 +312,9 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even } // do some paging (sub-optimal) - if to-from >= pageSize { + if to-from >= blockPage { paged = true - to = from + pageSize - 1 + to = from + blockPage - 1 } else { closeOnce.Do(func() { synced <- nil }) }