diff --git a/pkg/postage/batchservice/batchservice_test.go b/pkg/postage/batchservice/batchservice_test.go index 57a7e5aef82..48cf360d33d 100644 --- a/pkg/postage/batchservice/batchservice_test.go +++ b/pkg/postage/batchservice/batchservice_test.go @@ -7,17 +7,23 @@ package batchservice_test import ( "bytes" "context" + "encoding/hex" "errors" + "fmt" "hash" "math/big" + "math/rand" + "reflect" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/postage/batchservice" + "github.com/ethersphere/bee/v2/pkg/postage/batchstore" "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" + "github.com/ethersphere/bee/v2/pkg/statestore/leveldb" mocks "github.com/ethersphere/bee/v2/pkg/statestore/mock" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/util/testutil" @@ -509,6 +515,332 @@ func TestBatchServiceUpdateBlockNumber(t *testing.T) { } } +// The eq* types and helpers below back the #5343 chain-state buffering +// regression tests (TestChainStateBufferingEquivalence and its randomized +// sibling). #5343 stopped calling storer.PutChainState (which runs cleanup + +// radius recompute) on every UpdatePrice/UpdateBlockNumber and instead buffers +// the chain state in pendingChainState, flushing once per page at +// TransactionEnd. The concern is whether deferring that flush, while the price +// fluctuates up and down within a window, can leave a different set of evicted +// batches or a different radius than the old per-event behaviour. +// +// It cannot: the cumulative outpayment (TotalAmount) is monotonically +// non-decreasing, so a single cleanup at the end of a window evicts the exact +// same superset, and computeRadius is a pure function of the final batch set +// and depths (Create/TopUp/UpdateDepth recompute it immediately, never +// deferred). The helpers prove this by replaying one identical event stream +// three ways against three independent real batchstores and asserting their +// final batch set, radius and chain state are identical: +// +// - per-event: no transaction, so every UpdateBlockNumber/UpdatePrice +// persists immediately (pre-#5343 behaviour). +// - single-tx: the whole stream buffered, a single flush at the end. +// - per-page: one flush per page at TransactionEnd (#5343 behaviour). + +type eqBatch struct { + id, owner []byte +} + +type eqKind int + +const ( + eqCreate eqKind = iota + eqTopUp + eqDepth + eqPrice +) + +type eqEvent struct { + block uint64 + kind eqKind + batch *eqBatch // create/topup/depth target + value int64 // create/topup/depth normalised balance + depth uint8 // create/depth depth + price int64 // price +} + +type eqPage struct { + events []eqEvent + end uint64 +} + +func applyEqEvent(t *testing.T, svc postage.EventUpdater, mode string, e eqEvent) { + t.Helper() + if err := svc.UpdateBlockNumber(e.block); err != nil { + t.Fatalf("%s: update block %d: %v", mode, e.block, err) + } + switch e.kind { + case eqCreate: + // A near-zero-value batch is rejected identically in all modes (the + // chain state at a Create is the same regardless of flush timing), so + // mirror the listener and treat it as a skip rather than a failure. + err := svc.Create(e.batch.id, e.batch.owner, big.NewInt(0), big.NewInt(e.value), e.depth, 16, false, testTxHash) + if err != nil && !errors.Is(err, batchservice.ErrZeroValueBatch) { + t.Fatalf("%s: create: %v", mode, err) + } + case eqTopUp: + if err := svc.TopUp(e.batch.id, big.NewInt(0), big.NewInt(e.value), testTxHash); err != nil { + t.Fatalf("%s: topup: %v", mode, err) + } + case eqDepth: + if err := svc.UpdateDepth(e.batch.id, e.depth, big.NewInt(e.value), testTxHash); err != nil { + t.Fatalf("%s: depth: %v", mode, err) + } + case eqPrice: + if err := svc.UpdatePrice(big.NewInt(e.price), testTxHash); err != nil { + t.Fatalf("%s: price: %v", mode, err) + } + } +} + +// replayEq drives the script through svc in one of three flushing modes. The +// per-event mode omits transactions entirely, reproducing the pre-#5343 +// per-event PutChainState behaviour, while single-tx and per-page wrap the +// stream the way the listener does for a single large page and for many pages. +func replayEq(t *testing.T, svc postage.EventUpdater, mode string, script []eqPage) { + t.Helper() + + pageEnd := func(p eqPage) { + if err := svc.UpdateBlockNumber(p.end); err != nil { + t.Fatalf("%s: page end block %d: %v", mode, p.end, err) + } + } + begin := func() { + if err := svc.TransactionStart(); err != nil { + t.Fatalf("%s: tx start: %v", mode, err) + } + } + end := func() { + if err := svc.TransactionEnd(); err != nil { + t.Fatalf("%s: tx end: %v", mode, err) + } + } + + switch mode { + case "per-event": + for _, p := range script { + for _, e := range p.events { + applyEqEvent(t, svc, mode, e) + } + pageEnd(p) + } + case "single-tx": + begin() + for _, p := range script { + for _, e := range p.events { + applyEqEvent(t, svc, mode, e) + } + pageEnd(p) + } + end() + case "per-page": + for _, p := range script { + begin() + for _, e := range p.events { + applyEqEvent(t, svc, mode, e) + } + pageEnd(p) + end() + } + } +} + +type eqSnapshot struct { + block uint64 + total string + price string + radius uint8 + batches map[string]bool +} + +func takeEqSnapshot(t *testing.T, store postage.Storer) eqSnapshot { + t.Helper() + cs := store.GetChainState() + s := eqSnapshot{ + block: cs.Block, + total: cs.TotalAmount.String(), + price: cs.CurrentPrice.String(), + radius: store.Radius(), + batches: map[string]bool{}, + } + if err := store.Iterate(func(b *postage.Batch) (bool, error) { + s.batches[hex.EncodeToString(b.ID)] = true + return false, nil + }); err != nil { + t.Fatal(err) + } + return s +} + +// assertBufferingEquivalence replays the script against three independent real +// batchstores (per-event reference, one buffered transaction, one transaction +// per page) and fails if any buffered mode's final state differs from the +// per-event reference. It returns the reference snapshot. +func assertBufferingEquivalence(t *testing.T, capacity int, script []eqPage) eqSnapshot { + t.Helper() + run := func(mode string) eqSnapshot { + svc, store := newRealStoreService(t, capacity) + replayEq(t, svc, mode, script) + return takeEqSnapshot(t, store) + } + ref := run("per-event") + for _, mode := range []string{"single-tx", "per-page"} { + if got := run(mode); !reflect.DeepEqual(ref, got) { + t.Fatalf("mode %q diverged from per-event reference:\n per-event: %+v\n %-9s: %+v", mode, ref, mode, got) + } + } + return ref +} + +func TestChainStateBufferingEquivalence(t *testing.T) { + t.Parallel() + + // Small capacity so total commitment exceeds it and radius is non-zero. + const capacity = 32 + + mk := func() *eqBatch { + return &eqBatch{id: testutil.RandBytes(t, 32), owner: testutil.RandBytes(t, 32)} + } + a, b, c := mk(), mk(), mk() + + // Models the postage contract event stream the listener replays, covering + // all four EventUpdater mutations: + // - A, B, C created at depth 8 in the first blocks, + // - C topped up (8000) and then diluted to depth 10, so it stays alive and + // the final radius is driven by its new depth, + // - per-block price rises 1 -> 10 then falls 10 -> 2. + // Final TotalAmount is 153, which evicts A (50) and B (150); C survives. + script := []eqPage{ + { + events: []eqEvent{ + {block: 1, kind: eqCreate, batch: a, value: 50, depth: 8}, + {block: 2, kind: eqCreate, batch: b, value: 150, depth: 8}, + {block: 3, kind: eqCreate, batch: c, value: 5000, depth: 8}, + {block: 13, kind: eqPrice, price: 10}, + }, + end: 13, + }, + { + events: []eqEvent{ + {block: 18, kind: eqTopUp, batch: c, value: 8000}, + {block: 23, kind: eqDepth, batch: c, value: 8000, depth: 10}, + {block: 23, kind: eqPrice, price: 2}, + }, + end: 23, + }, + { + events: []eqEvent{}, + end: 43, + }, + } + + ref := assertBufferingEquivalence(t, capacity, script) + + // Guard against future edits making the script vacuous: it must actually + // evict A and B, keep C, and move the radius off zero. + if len(ref.batches) != 1 || !ref.batches[hex.EncodeToString(c.id)] { + t.Fatalf("script no longer exercises eviction: surviving batches %v", ref.batches) + } + if ref.radius == 0 { + t.Fatal("script no longer exercises radius change: radius is 0") + } +} + +// TestChainStateBufferingEquivalenceRandomized fuzzes the equivalence property: +// for many randomly generated event streams (random prices up and down, random +// top-ups and depth changes, random page boundaries), the buffered modes must +// reach the same final state as the per-event reference. TopUp/UpdateDepth only +// ever target "permanent" batches whose value exceeds any reachable TotalAmount, +// so the targeted batch is guaranteed to exist in every mode regardless of how +// flush timing shifts eviction of the ephemeral batches. +func TestChainStateBufferingEquivalenceRandomized(t *testing.T) { + t.Parallel() + + const ( + iterations = 40 + capacity = 16 + ) + + const seed = int64(20260217) + t.Logf("randomized equivalence seed=%d (override by editing the test)", seed) + rng := rand.New(rand.NewSource(seed)) + + for i := 0; i < iterations; i++ { + script := randomEqScript(rng) + t.Run(fmt.Sprintf("iter_%02d", i), func(t *testing.T) { + assertBufferingEquivalence(t, capacity, script) + }) + } +} + +// randomEqScript builds a reproducible random event stream. All batches are +// created in the first page (so later top-ups/depth changes always reference an +// existing batch); subsequent pages carry random price moves, top-ups and depth +// changes split across random page boundaries. +func randomEqScript(rng *rand.Rand) []eqPage { + // permanentValue is far above any TotalAmount the script can reach + // (bounded by ~maxBlocks * maxPrice), so permanent batches never expire. + const permanentValue = int64(1) << 40 + + randBytes := func(n int) []byte { + b := make([]byte, n) + _, _ = rng.Read(b) + return b + } + randDepth := func() uint8 { return uint8(2 + rng.Intn(9)) } // 2..10 + + var block uint64 + nextBlock := func(maxGap int) uint64 { + block += uint64(1 + rng.Intn(maxGap)) + return block + } + + numPermanent := 1 + rng.Intn(2) // 1..2 + numEphemeral := 2 + rng.Intn(4) // 2..5 + + var ( + creates []eqEvent + permanents []*eqBatch + ) + for p := 0; p < numPermanent; p++ { + batch := &eqBatch{id: randBytes(32), owner: randBytes(32)} + permanents = append(permanents, batch) + creates = append(creates, eqEvent{block: nextBlock(3), kind: eqCreate, batch: batch, value: permanentValue, depth: randDepth()}) + } + for e := 0; e < numEphemeral; e++ { + batch := &eqBatch{id: randBytes(32), owner: randBytes(32)} + // Values that, given the price/block accrual below, leave a random mix + // of survivors and evictions. + value := int64(50 + rng.Intn(4951)) // 50..5000 + creates = append(creates, eqEvent{block: nextBlock(3), kind: eqCreate, batch: batch, value: value, depth: randDepth()}) + } + + script := []eqPage{{events: creates, end: block}} + + numPages := 1 + rng.Intn(6) + for p := 0; p < numPages; p++ { + var events []eqEvent + for n := rng.Intn(5); n > 0; n-- { // 0..4 events per page + blk := nextBlock(10) + switch rng.Intn(3) { + case 0: + events = append(events, eqEvent{block: blk, kind: eqPrice, price: int64(1 + rng.Intn(40))}) + case 1: + events = append(events, eqEvent{block: blk, kind: eqTopUp, batch: permanents[rng.Intn(len(permanents))], value: permanentValue}) + case 2: + events = append(events, eqEvent{block: blk, kind: eqDepth, batch: permanents[rng.Intn(len(permanents))], value: permanentValue, depth: randDepth()}) + } + } + end := block + if len(events) == 0 { + end = nextBlock(10) // ensure blocks advance so eviction can progress + } + script = append(script, eqPage{events: events, end: end}) + } + + return script +} + func TestTransactionOk(t *testing.T) { t.Parallel() @@ -654,6 +986,44 @@ func newTestStoreAndService(t *testing.T, opts ...mock.Option) (postage.EventUpd return newTestStoreAndServiceWithListener(t, nil, nil, opts...) } +// newRealStoreService wires a batchservice to a real batchstore backed by a +// leveldb state store, so that PutChainState exercises the actual cleanup and +// computeRadius logic. The in-memory mock state store cannot be used here: it +// iterates its backing map in random order, which breaks the sorted early-stop +// in batchstore cleanup and yields non-deterministic evictions. +func newRealStoreService(t *testing.T, capacity int) (postage.EventUpdater, postage.Storer) { + t.Helper() + + // In-memory leveldb: sorted iteration (unlike the map-backed mock state + // store) is required for batchstore cleanup's value-ordered early-stop. + stateStore, err := leveldb.NewInMemoryStateStore(log.Noop) + if err != nil { + t.Fatal(err) + } + testutil.CleanupCloser(t, stateStore) + + store, err := batchstore.New(stateStore, func([]byte) error { return nil }, capacity, log.Noop) + if err != nil { + t.Fatal(err) + } + + // Initialise chain state as the node bootstrap does. + if err := store.PutChainState(&postage.ChainState{ + Block: 0, + TotalAmount: big.NewInt(0), + CurrentPrice: big.NewInt(1), + }); err != nil { + t.Fatal(err) + } + + svc, err := batchservice.New(mocks.NewStateStore(), store, testLog, newMockListener(), nil, nil, nil, false) + if err != nil { + t.Fatal(err) + } + + return svc, store +} + func createBatch(t *testing.T, store postage.Storer, b *postage.Batch) { t.Helper()