Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ const (
minPaymentThreshold = 2 * refreshRate // minimal accepted payment threshold of full nodes
maxPaymentThreshold = 24 * refreshRate // maximal accepted payment threshold of full nodes
mainnetNetworkID = uint64(1) //
reserveWakeUpDuration = 15 * time.Minute // time to wait before waking up reserveWorker
reserveWakeUpDuration = 5 * time.Minute // time to wait before waking up reserveWorker
reserveMinEvictCount = 1_000
cacheMinEvictCount = 10_000
maxAllowedDoubling = 1
Expand Down
37 changes: 30 additions & 7 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,43 @@ func (db *DB) reserveWorker(ctx context.Context, ready chan<- struct{}) {
case <-thresholdTicker.C:

radius := db.reserve.Radius()
if radius <= db.reserveOptions.minimumRadius {
continue
}
count, err := db.countWithinRadius(ctx)
if err != nil {
db.logger.Warning("reserve worker count within radius", "error", err)
continue
}

if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.reserveOptions.minimumRadius {
radius--
if err := db.reserve.SetRadius(radius); err != nil {
db.logger.Error(err, "reserve set radius")
}
db.metrics.StorageRadius.Set(float64(radius))
db.logger.Info("reserve radius decrease", "radius", radius)
t := threshold(db.reserve.Capacity())
if count >= t {
continue
}

// Decrement the storage radius. The decrement is gated only on
// the reserve fill state (count < threshold) and the operator's
// minimum-radius floor. There is no sync-rate gate here, which
// mirrors the unreserve path that raises radius without
// consulting sync activity. A previous SyncRate() == 0 gate
// proved structurally unreachable on live networks: peer churn
// kept historical sync above zero, and the resetIntervals call
// in puller.onChange (triggered by this very radius change)
// retriggered historical sync, locking the gate closed (issues
// #5396, #5428). When count is well below threshold, jump by
// two steps to keep adjustments bounded but recover faster from
// large gaps; under uniform CAC bin distribution each step
// roughly doubles count-within-radius.
steps := uint8(1)
if count*4 <= t && radius-1 > db.reserveOptions.minimumRadius {
steps = 2
}
radius -= steps
if err := db.reserve.SetRadius(radius); err != nil {
db.logger.Error(err, "reserve set radius")
}
db.metrics.StorageRadius.Set(float64(radius))
db.logger.Info("reserve radius decrease", "radius", radius, "steps", steps, "count_within_radius", count, "threshold", t)
}
}
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/storer/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,20 +516,54 @@ func TestRadiusManager(t *testing.T) {
waitForRadius(t, storer.Reserve(), 0)
})

t.Run("radius doesn't change due to non-zero pull rate", func(t *testing.T) {
t.Run("radius decreases even with non-zero pull rate", func(t *testing.T) {
t.Parallel()
storer, err := diskStorer(t, dbTestOps(baseAddr, 10, nil, nil, time.Millisecond*500))()
if err != nil {
t.Fatal(err)
}
readyC := make(chan struct{})
// Reserve is empty, so countWithinRadius == 0 < threshold; the worker
// should decrement radius regardless of the syncer's reported rate.
// The old behavior (gated on SyncRate() == 0) made this scenario
// permanently stuck on live networks where peer churn keeps the rate
// above zero — see issues #5396 and #5428.
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(1), networkRadiusFunc(3), readyC)
select {
case <-readyC:
case <-t.Context().Done():
t.Fatal("start reserve worker timeout")
}
waitForRadius(t, storer.Reserve(), 3)
waitForRadius(t, storer.Reserve(), 0)
})

t.Run("radius stops at minimum_radius", func(t *testing.T) {
t.Parallel()
opts := dbTestOps(baseAddr, 10, nil, nil, time.Millisecond*500)
opts.MinimumStorageRadius = 2
storer, err := diskStorer(t, opts)()
if err != nil {
t.Fatal(err)
}
readyC := make(chan struct{})
// An empty reserve at network radius 3 would normally let the worker
// decrement all the way to 0. With MinimumStorageRadius=2 the floor
// must hold: the two-step branch has to downgrade to a single step
// (because radius-1 == minimumRadius), and once radius reaches the
// floor the top-level guard must block any further decrement.
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(3), readyC)
select {
case <-readyC:
case <-t.Context().Done():
t.Fatal("start reserve worker timeout")
}
waitForRadius(t, storer.Reserve(), 2)

// Hold across several wake-up cycles to confirm the floor sticks.
time.Sleep(time.Millisecond * 500 * 3)
if got := storer.Reserve().Radius(); got != 2 {
t.Fatalf("radius dropped below minimum: want %d, got %d", 2, got)
}
})
}

Expand Down
Loading