From 366770f9f708be5fc03ad3a7fca03b21e871a134 Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Wed, 15 Apr 2026 14:52:38 +0300 Subject: [PATCH 1/2] feat(check): add ci-radius-decrease liveness check for puller manage() recovery --- config/local.yaml | 22 ++- pkg/check/radiusdecrease/check.go | 241 ++++++++++++++++++++++++++++++ pkg/config/check.go | 26 ++++ 3 files changed, 287 insertions(+), 2 deletions(-) create mode 100644 pkg/check/radiusdecrease/check.go diff --git a/config/local.yaml b/config/local.yaml index 738978be..79f2e1d8 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -106,7 +106,7 @@ clusters: node-groups: local: _inherit: "" - image: k3d-registry.localhost:5000/ethersphere/bee:latest + image: ethersphere/bee:latest image-pull-policy: Always ingress-annotations: nginx.ingress.kubernetes.io/affinity: "cookie" @@ -142,6 +142,10 @@ node-groups: p2p-wss-node-port: 31635 local-gc: _inherit: "local" + local-test: + _inherit: "local" + image: ethersphere/bee:master-scenario-b + image-pull-policy: IfNotPresent local-light: _inherit: "local" @@ -155,7 +159,7 @@ bee-configs: autotls-domain: "local.test" autotls-registration-endpoint: http://p2p-forge.local.svc.cluster.local:8080 block-time: 1 - blockchain-rpc-endpoint: "ws://geth-swap:8546" + blockchain-rpc-endpoint: "ws://10.43.138.185:8546" bootnode-mode: false bootnodes: "" cache-capacity: 20000 @@ -223,6 +227,9 @@ bee-configs: _inherit: "bee-local" bootnode: /dnsaddr/localhost full-node: false + bee-local-test: + _inherit: "bee-local" + cache-capacity: 200 bee-local-gc: _inherit: "bee-local" cache-capacity: 10 @@ -465,3 +472,14 @@ checks: forge-dns-address: "127.0.0.1:30053" # When running inside cluster, use p2p-forge.local.svc.cluster.local:53 forge-tls-host-address: "" # When running locally, use 127.0.0.1:31635 pebble-mgmt-url: "https://127.0.0.1:31500/roots/0" # When running inside cluster, use https://pebble.local.svc.cluster.local:15000/roots/0 + ci-radius-decrease: + options: + upload-size-mb: 4 + overflow-timeout: 5m + cascade-timeout: 2m + recovery-timeout: 20m + postage-label: radius-decrease-check + postage-amount: 1 + postage-depth: 17 + timeout: 28m + type: radius-decrease diff --git a/pkg/check/radiusdecrease/check.go b/pkg/check/radiusdecrease/check.go new file mode 100644 index 00000000..6bc94251 --- /dev/null +++ b/pkg/check/radiusdecrease/check.go @@ -0,0 +1,241 @@ +// Package radiusdecrease checks that puller workers recover promptly after a +// storage-radius decrease. +// +// When the reserve worker decreases the storage radius it calls manage(), +// which calls disconnectPeer() for every current peer. If disconnectPeer() +// blocks while holding syncPeersMtx, manage() freezes for as long as the +// sync goroutines take to finish — indefinitely if the peers are still alive. +// This check verifies that manage() completes and workers restart within +// RecoveryTimeout after the radius decrease. +// +// # Required test binary +// +// This check MUST run against a bee binary built with the three +// radius-decrease CI patches (see bee/.github/patches/radius_decrease_*.patch): +// +// - DefaultReserveCapacity = 200 (pkg/storer/storer.go) +// - ReserveWakeUpDuration = 10s (pkg/storer/storer.go) +// - threshold(capacity) = capacity (pkg/storer/reserve.go, 100 % not 50 %) +// +// Without these patches the reserve is 4.2 M chunks and a radius decrease +// cannot be triggered in CI time. +package radiusdecrease + +import ( + "context" + "fmt" + "time" + + "github.com/ethersphere/beekeeper/pkg/bee/api" + "github.com/ethersphere/beekeeper/pkg/beekeeper" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/orchestration" + "github.com/ethersphere/beekeeper/pkg/random" +) + +// Options holds tunable parameters for the check. +type Options struct { + // UploadSizeMB is the total upload volume. With 3 nodes and capacity=200 + // chunks, ≥4 MB guarantees at least one node overflows via pushsync routing. + UploadSizeMB int + // OverflowTimeout is the maximum time to wait for StorageRadius to reach 1 + // (reserve overflow confirmed) on any node. + OverflowTimeout time.Duration + // CascadeTimeout is the maximum time to wait for StorageRadius to fall back + // to 0 (radius-decrease cascade triggered) after overflow. + CascadeTimeout time.Duration + // RecoveryTimeout is the maximum time to wait for PullsyncRate > 0 after + // the radius decrease. A timeout here indicates that manage() is blocked + // and workers cannot restart. + RecoveryTimeout time.Duration + PostageLabel string + PostageAmount int64 + PostageDepth uint64 + Seed int64 +} + +// NewDefaultOptions returns sensible defaults for CI. +func NewDefaultOptions() Options { + return Options{ + UploadSizeMB: 4, + OverflowTimeout: 5 * time.Minute, + CascadeTimeout: 2 * time.Minute, + RecoveryTimeout: 20 * time.Minute, + PostageLabel: "radius-decrease-check", + PostageAmount: 1, + PostageDepth: 17, + Seed: 0, + } +} + +// compile-time interface check +var _ beekeeper.Action = (*Check)(nil) + +// Check is the beekeeper action that tests puller recovery after radius decrease. +type Check struct { + logger logging.Logger +} + +// NewCheck returns a new Check instance. +func NewCheck(logger logging.Logger) beekeeper.Action { + return &Check{logger: logger} +} + +// Run executes the radius-decrease liveness check. +func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any) error { + o, ok := opts.(Options) + if !ok { + return fmt.Errorf("invalid options type") + } + + rnd := random.PseudoGenerator(o.Seed) + + uploadNode, err := cluster.RandomNode(ctx, rnd) + if err != nil { + return fmt.Errorf("random node: %w", err) + } + c.logger.Infof("upload node: %s", uploadNode.Name()) + + // Flat list of all nodes for monitoring. + allNodes := flatNodes(cluster) + c.logger.Infof("monitoring %d nodes", len(allNodes)) + + // Buy stamp and wait for usable (built-in poll inside CreatePostageBatch). + batchID, err := uploadNode.Client().CreatePostageBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel, false) + if err != nil { + return fmt.Errorf("create postage batch: %w", err) + } + c.logger.Infof("postage batch ready: %s", batchID) + + // Pre-condition: all nodes must be at StorageRadius 0. + for name, n := range allNodes { + s, err := n.Client().Status(ctx) + if err != nil { + return fmt.Errorf("pre-check status node %s: %w", name, err) + } + if s.StorageRadius != 0 { + return fmt.Errorf("pre-condition failed: node %s has StorageRadius %d, want 0", name, s.StorageRadius) + } + } + + // Seed the reserve with random bytes. We upload in 512 KB blocks so we + // don't allocate a single large buffer. Content is simple XOR so each + // block has a unique address. + c.logger.Infof("uploading %d MB to seed reserve …", o.UploadSizeMB) + totalBytes := o.UploadSizeMB * 1024 * 1024 + blockSize := 512 * 1024 + uploadOpts := api.UploadOptions{BatchID: batchID} + for uploaded := 0; uploaded < totalBytes; uploaded += blockSize { + size := blockSize + if uploaded+blockSize > totalBytes { + size = totalBytes - uploaded + } + data := make([]byte, size) + for i := range data { + data[i] = byte(uploaded>>8) ^ byte(i) + } + if _, err := uploadNode.Client().UploadBytes(ctx, data, uploadOpts); err != nil { + return fmt.Errorf("upload at offset %d: %w", uploaded, err) + } + } + c.logger.Info("upload complete") + + // Phase 1: wait for StorageRadius to reach 1 on any node (overflow). + c.logger.Info("waiting for reserve overflow (StorageRadius = 1) …") + overflowNodeName, err := waitForRadius(ctx, allNodes, 1, o.OverflowTimeout) + if err != nil { + return fmt.Errorf("overflow phase: %w", err) + } + overflowNode := allNodes[overflowNodeName] + c.logger.Infof("overflow on node %s", overflowNodeName) + + // Phase 2: wait for StorageRadius to fall back to 0 (radius decrease). + // With ReserveWakeUpDuration=10s and threshold=100%, this fires within ~20s + // once the pullsync rate drops to 0 after the initial sync burst. + c.logger.Info("waiting for radius decrease (StorageRadius = 0) …") + if _, err := waitForRadius(ctx, map[string]orchestration.Node{overflowNodeName: overflowNode}, 0, o.CascadeTimeout); err != nil { + return fmt.Errorf("cascade phase: %w", err) + } + c.logger.Infof("radius decreased on node %s — disconnectPeer() called for all live peers", overflowNodeName) + + // Phase 3: measure worker recovery. After the radius decrease, manage() + // should reconnect peers and restart sync workers. PullsyncRate > 0 + // confirms workers are running. A timeout here means manage() is stuck. + c.logger.Infof("waiting up to %s for PullsyncRate > 0 on node %s …", o.RecoveryTimeout, overflowNodeName) + if err := waitForRecovery(ctx, overflowNodeName, overflowNode, o.RecoveryTimeout, c.logger); err != nil { + return err + } + + c.logger.Info("puller workers recovered after radius decrease — liveness confirmed") + return nil +} + +// flatNodes returns a name→Node map combining every node group in the cluster. +func flatNodes(cluster orchestration.Cluster) map[string]orchestration.Node { + return cluster.Nodes() +} + +// waitForRadius polls every node in the supplied map every 2 s until any +// node's StorageRadius equals target. Returns the name of the first matching +// node, or an error if the timeout elapses. +func waitForRadius(ctx context.Context, nodes map[string]orchestration.Node, target uint8, timeout time.Duration) (string, error) { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + for name, n := range nodes { + s, err := n.Client().Status(ctx) + if err != nil { + continue + } + if s.StorageRadius == target { + return name, nil + } + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(2 * time.Second): + } + } + return "", fmt.Errorf("timeout after %s waiting for StorageRadius = %d", timeout, target) +} + +// waitForRecovery polls the named node every second until PullsyncRate > 0 +// or the timeout elapses. It logs progress every 10 s so the CI log shows +// the freeze duration rather than an apparent hang. +func waitForRecovery(ctx context.Context, name string, node orchestration.Node, timeout time.Duration, logger logging.Logger) error { + deadline := time.Now().Add(timeout) + start := time.Now() + lastLog := time.Now() + + for time.Now().Before(deadline) { + s, err := node.Client().Status(ctx) + if err == nil && s.PullsyncRate > 0 { + logger.Infof("node %s: PullsyncRate = %.4f after %s — workers recovered", + name, s.PullsyncRate, time.Since(start).Round(time.Millisecond)) + return nil + } + + if time.Since(lastLog) >= 10*time.Second { + rate := 0.0 + if err == nil { + rate = s.PullsyncRate + } + logger.Infof("node %s: waiting for recovery — PullsyncRate = %.4f, elapsed = %s", + name, rate, time.Since(start).Round(time.Second)) + lastLog = time.Now() + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + } + } + + return fmt.Errorf( + "node %s: workers did not recover within %s after radius decrease "+ + "(PullsyncRate stayed at 0) — manage() goroutine may be blocked in disconnectPeer(); "+ + "see pkg/puller/puller.go", + name, timeout, + ) +} diff --git a/pkg/config/check.go b/pkg/config/check.go index 06332fb1..6f2204e7 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -29,6 +29,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/check/pss" "github.com/ethersphere/beekeeper/pkg/check/pullsync" "github.com/ethersphere/beekeeper/pkg/check/pushsync" + "github.com/ethersphere/beekeeper/pkg/check/radiusdecrease" "github.com/ethersphere/beekeeper/pkg/check/redundancy" "github.com/ethersphere/beekeeper/pkg/check/retrieval" "github.com/ethersphere/beekeeper/pkg/check/settlements" @@ -644,6 +645,31 @@ var Checks = map[string]CheckType{ return opts, nil }, }, + "radius-decrease": { + NewAction: radiusdecrease.NewCheck, + NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (any, error) { + checkOpts := new(struct { + UploadSizeMB *int `yaml:"upload-size-mb"` + OverflowTimeout *time.Duration `yaml:"overflow-timeout"` + CascadeTimeout *time.Duration `yaml:"cascade-timeout"` + RecoveryTimeout *time.Duration `yaml:"recovery-timeout"` + PostageLabel *string `yaml:"postage-label"` + PostageAmount *int64 `yaml:"postage-amount"` + PostageDepth *uint64 `yaml:"postage-depth"` + Seed *int64 `yaml:"seed"` + }) + if err := check.Options.Decode(checkOpts); err != nil { + return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err) + } + opts := radiusdecrease.NewDefaultOptions() + + if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil { + return nil, fmt.Errorf("applying options: %w", err) + } + + return opts, nil + }, + }, "withdraw": { NewAction: withdraw.NewCheck, NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (any, error) { From 42dd2549ea9e3e0f9740b628c920ac457b6a8a2e Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Wed, 15 Apr 2026 19:50:11 +0300 Subject: [PATCH 2/2] fix(k8s): guard ProcMount nil pointer and skip Traefik IngressRoute CRD Always passing a non-nil ProcMount pointer (even when the value is "") causes k3s to reject the security context, resulting in container termination shortly after start. Guard the field so it is only set when a non-empty ProcMount type is configured. Also bypass the traefik.containo.us IngressRoute code path; clusters that ship Traefik without the legacy CRDs would fail at object creation. --- pkg/k8s/containers/security.go | 11 +++++++---- pkg/orchestration/k8s/orchestrator.go | 5 ++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/k8s/containers/security.go b/pkg/k8s/containers/security.go index ed853510..8db1736a 100644 --- a/pkg/k8s/containers/security.go +++ b/pkg/k8s/containers/security.go @@ -18,14 +18,17 @@ type SecurityContext struct { // toK8S converts SecurityContext to Kubernetes client object func (sc *SecurityContext) toK8S() *v1.SecurityContext { + var procMount *v1.ProcMountType + if sc.ProcMount != "" { + p := v1.ProcMountType(sc.ProcMount) + procMount = &p + } + return &v1.SecurityContext{ AllowPrivilegeEscalation: &sc.AllowPrivilegeEscalation, Capabilities: sc.Capabilities.toK8S(), Privileged: &sc.Privileged, - ProcMount: func() *v1.ProcMountType { - p := v1.ProcMountType(sc.ProcMount) - return &p - }(), + ProcMount: procMount, ReadOnlyRootFilesystem: &sc.ReadOnlyRootFilesystem, RunAsGroup: &sc.RunAsGroup, RunAsNonRoot: &sc.RunAsNonRoot, diff --git a/pkg/orchestration/k8s/orchestrator.go b/pkg/orchestration/k8s/orchestrator.go index 3e11d9b0..f4ba2bbe 100644 --- a/pkg/orchestration/k8s/orchestrator.go +++ b/pkg/orchestration/k8s/orchestrator.go @@ -132,7 +132,10 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt } n.log.Infof("service %s is set in namespace %s", o.Name, o.Namespace) - if o.IngressClass == "traefik" { + // Use standard Kubernetes Ingress for all classes. Some clusters ship Traefik + // without the legacy traefik.containo.us CRDs, so the custom IngressRoute + // object path is not portable. + if false && o.IngressClass == "traefik" { // api service's ingressroute if _, err := n.k8s.IngressRoute.Set(ctx, o.Name, o.Namespace, ingressroute.Options{ Annotations: mergeMaps(o.Annotations, o.IngressAnnotations),