From 637d14bd05fa0a7a47244ebf19333de0a57e4ae8 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Sat, 20 Dec 2025 15:24:28 +0500 Subject: [PATCH] temp --- REACHABILITY_ACTIVE_PROBING_SPEC.md | 142 ++++ README.md | 20 +- docs/reachability_active_probing.md | 234 +++++++ go.mod | 21 +- go.sum | 76 ++- p2p/kademlia/conn_pool.go | 16 +- p2p/kademlia/network.go | 9 +- pkg/net/grpc/server/server.go | 25 + pkg/reachability/grpc.go | 25 + pkg/reachability/store.go | 118 ++++ pkg/reachability/store_test.go | 118 ++++ pkg/reachability/types.go | 12 + supernode/cmd/start.go | 4 + supernode/supernode_metrics/active_probing.go | 604 ++++++++++++++++++ supernode/supernode_metrics/constants.go | 45 ++ supernode/supernode_metrics/health_checks.go | 226 ------- .../supernode_metrics/metrics_collection.go | 74 ++- .../supernode_metrics/monitor_service.go | 62 +- .../reachability_active_probing_test.go | 473 ++++++++++++++ .../supernode_metrics/reachability_quorum.go | 105 +++ supernode/transport/gateway/server.go | 47 +- 21 files changed, 2123 insertions(+), 333 deletions(-) create mode 100644 REACHABILITY_ACTIVE_PROBING_SPEC.md create mode 100644 docs/reachability_active_probing.md create mode 100644 pkg/reachability/grpc.go create mode 100644 pkg/reachability/store.go create mode 100644 pkg/reachability/store_test.go create mode 100644 pkg/reachability/types.go create mode 100644 supernode/supernode_metrics/active_probing.go create mode 100644 supernode/supernode_metrics/constants.go delete mode 100644 supernode/supernode_metrics/health_checks.go create mode 100644 supernode/supernode_metrics/reachability_active_probing_test.go create mode 100644 supernode/supernode_metrics/reachability_quorum.go diff --git a/REACHABILITY_ACTIVE_PROBING_SPEC.md b/REACHABILITY_ACTIVE_PROBING_SPEC.md new file mode 100644 index 00000000..18df9502 --- /dev/null +++ b/REACHABILITY_ACTIVE_PROBING_SPEC.md @@ -0,0 +1,142 @@ +# Reachability + Active Probing Spec (Supernode) + +## Scope + +- Services/ports in scope: + - gRPC (default `4444`) + - P2P (default `4445`) + - Gateway HTTP status (default `8002`, endpoint `/api/v1/status`) +- Address family: **IPv4 only** (IPv6 is ignored everywhere in reachability and probing). +- No self-connect / hairpin fallback. +- Output is `open_ports` as a per-port state: `OPEN`, `CLOSED`, or `UNKNOWN`. + +## Goals + +- Make port reachability evidence-based (“reachable from outside” means we observed real inbound traffic). +- Add **active probing** so low-traffic but healthy nodes still get inbound traffic and can be marked `OPEN`. +- Make the probing layer robust enough that **silence can be treated as `CLOSED`** (with explicit quorum rules). +- Keep Lumera as “self-reported metrics”; do not introduce peer-submitted on-chain reachability reports. + +## Non-goals + +- IPv6 reachability. +- Inferring `OPEN` from local checks (self-dial) or from outbound connectivity. + +## Definitions + +- **Inbound evidence**: a timestamp recorded when the supernode receives a real inbound IPv4 connection/request on a service. +- **Evidence window**: how long inbound evidence is considered “fresh” for reporting. +- **Epoch**: a chain-derived time window used to schedule probing deterministically across nodes. +- **Eligible set**: the set of supernodes allowed to participate in probing for an epoch. + +## 1) Passive evidence (truth source) + +### Evidence store + +Maintain `last_inbound_time[service]` in-memory, keyed by `{grpc, p2p, gateway}`. + +Evidence acceptance rules: + +- A remote address must be present. +- Remote address must be **IPv4**. +- Ignore loopback addresses. +- If an authenticated remote identity is available and equals our own identity, ignore it (prevents “self” evidence). + +### Evidence capture points + +Evidence must be recorded only from real inbound traffic: + +- **gRPC**: record on every inbound RPC (unary + streaming) via a server interceptor. +- **Gateway**: record on successful (2xx) `/api/v1/status` responses. +- **P2P**: record only after the secure handshake succeeds. + +Rationale: each capture point aligns evidence with the “port is reachable from outside” claim. + +## 2) Active probing (traffic generator) + +Active probing exists solely to create reliable inbound evidence across the network. + +### Participation rules (who probes) + +- Probing runs on every **eligible ACTIVE** supernode (mandatory for coverage). +- Eligibility requires a routable IPv4 registration address (private/loopback only in integration tests). +- **POSTPONED** nodes are excluded from the probing budget by default (they may be offline and would dilute coverage for ACTIVE nodes). + +### Target rules (who gets probed) + +- Targets are **ACTIVE** supernodes only. +- Targets must have a routable IPv4 address from chain registration. +- Private/loopback targets are only allowed in integration tests. + +### Epoch and deterministic scheduling + +To make “silence means closed” defensible, probe scheduling must be deterministic and consistent: + +1. Compute an **epoch id** from chain height: + - `epoch_id = floor(current_height / epoch_blocks)` +2. Build the eligible set for that epoch: + - Query `ListSuperNodes()` at (or near) `epoch_id` start. + - Filter to ACTIVE with a routable IPv4 registration address. + - Sort by `supernode_account` (stable ordering) to form an indexable list `L` of size `N`. +3. Derive `K` distinct probe offsets from a hash of `(epoch_id, j)`: + - For `j in [0..K-1]`: `offset_j ∈ [1..N-1]` +4. For each node at index `i` in `L`, its outbound targets are: + - `target_index = (i + offset_j) mod N` + +Properties: + +- For a fixed `offset_j`, the mapping is a permutation: each node is targeted exactly once for that `j`. +- Using `K` distinct offsets yields **K distinct probers per target per epoch**. + +### Probes executed per target + +When probing a target, attempt all three services: + +1. **gRPC probe**: connect with ALTS + call gRPC health `Check` (requires `SERVING`). +2. **Gateway probe**: `GET http://:8002/api/v1/status` (requires 2xx). +3. **P2P probe**: raw TCP dial + Lumera secure handshake. + +Why P2P uses “raw TCP dial + handshake”: + +- P2P is not gRPC; it is a custom protocol on top of TCP. +- The secure handshake is required to make the connection meaningful and to match when the target records inbound P2P evidence (after handshake success). + +### Timeouts and load control + +- Each probe has a strict timeout (`ProbeTimeoutSeconds`). +- Probes should be time-distributed within the epoch to avoid synchronized spikes. +- Concurrency must be bounded (implementation detail) to avoid resource exhaustion. + +## 3) Port state derivation (OPEN / CLOSED / UNKNOWN) + +For each service port, compute: + +1. `OPEN` if inbound evidence is fresh within the evidence window. +2. `CLOSED` if inbound evidence is not fresh **and** the probing quorum rules below are satisfied. +3. `UNKNOWN` only if quorum cannot be established (bootstrap conditions), e.g.: + - not enough eligible peers, + - chain unreachable / no recent peer list, + - epoch alignment unavailable. + +## 4) Quorum rules (when silence implies CLOSED) + +To treat “no inbound evidence” as “closed”, we require proof that the network *should* have produced inbound attempts. + +For a node `X` in epoch `E`: + +- `AssignedProbers(E, X)` is the set of `K` distinct peers that should probe `X` under the deterministic schedule. +- A prober `P` is considered **alive in epoch E** if chain data shows it has reported metrics recently (e.g., `SuperNode.Metrics.Height` is within the epoch / freshness bounds). + +Rule: + +- If `alive_assigned_probers(E, X) >= quorum` and `X` has **no inbound evidence** for a service within the evidence window, then the service is `CLOSED`. + +This makes “silence means closed” an explicit, checkable condition rather than an assumption. + +## Parameters + +- `EvidenceWindowSeconds` (default 600s) +- `ProbeTimeoutSeconds` (default 5s) +- `epoch_blocks` (chain-height derived epoch length) +- `K` (probe assignments per epoch, must be ≥ quorum) +- `quorum` (minimum alive assigned probers required to treat silence as `CLOSED`) diff --git a/README.md b/README.md index 4b4da332..2cc75a49 100644 --- a/README.md +++ b/README.md @@ -200,9 +200,25 @@ enum SupernodeEventType { See docs/gateway.md for the full gateway guide (endpoints, examples, Swagger links). -### HTTP Gateway +## Reachability & Active Probing -See docs/gateway.md for the full gateway guide (endpoints, examples, Swagger links). +The supernode reports external reachability as `open_ports` for the three well-known services: + +- gRPC (`4444`) +- P2P (`4445`) +- Gateway status (`8002`, `GET /api/v1/status`) + +Each port is reported as a tri-state value: + +- `OPEN`: we observed real inbound IPv4 traffic recently. +- `CLOSED`: we have no recent inbound evidence and the deterministic probing quorum rules indicate silence is meaningful. +- `UNKNOWN`: bootstrap / chain unavailable / not enough eligible peers to establish quorum. + +Active probing exists only to generate a small amount of inbound traffic so that quiet-but-healthy nodes can still be marked `OPEN`. +For full implementation details and the rationale/spec, see: + +- `docs/reachability_active_probing.md` +- `REACHABILITY_ACTIVE_PROBING_SPEC.md` ## CLI Commands diff --git a/docs/reachability_active_probing.md b/docs/reachability_active_probing.md new file mode 100644 index 00000000..e8b703c4 --- /dev/null +++ b/docs/reachability_active_probing.md @@ -0,0 +1,234 @@ +# Reachability + Active Probing (Supernode) + +This document describes how the supernode derives `open_ports` and how the +network-wide active probing layer works. + +For the design rationale and spec, see `REACHABILITY_ACTIVE_PROBING_SPEC.md`. + +## Overview + +The supernode reports reachability as `open_ports`, where each port is one of: + +- `OPEN`: we observed **real inbound IPv4 traffic** recently. +- `CLOSED`: we have **no recent inbound evidence** and the probing quorum rules + say that “silence is meaningful”. +- `UNKNOWN`: we cannot establish quorum (bootstrap / chain unavailable / not + enough eligible peers). + +Key properties: + +- **IPv4 only**: IPv6 is ignored everywhere. +- **Evidence-based**: no self-dials and no local-only checks are used to mark + a port `OPEN`. +- **Active probing** exists only to generate inbound traffic so quiet-but-healthy + nodes still receive real connections. + +## Passive inbound evidence (truth source) + +Inbound evidence is stored in-memory as `last_inbound_time[service]`: + +- gRPC (`ServiceGRPC`) +- P2P (`ServiceP2P`) +- Gateway status (`ServiceGateway`) + +Notes: + +- The evidence store is **in-memory only** and is reset on process restart. + Expect ports to start at `UNKNOWN` after restart until new inbound evidence is observed. +- Evidence is tracked per *service*, then applied to the configured port for that service when + building `open_ports`. + +Evidence is accepted only when: + +- A remote network address exists. +- The remote address is **IPv4**. +- The remote address is not loopback. +- If an authenticated remote identity is available and equals our own identity, + it is ignored (prevents “self” evidence). + +### Capture points + +- **gRPC**: recorded for every inbound RPC via gRPC interceptors. + - Code: `pkg/net/grpc/server/server.go` +- **Gateway**: recorded on successful (2xx) `/api/v1/status` responses. + - Code: `supernode/transport/gateway/server.go` +- **P2P**: recorded only after the secure handshake succeeds. + - Code: `p2p/kademlia/network.go`, `p2p/kademlia/conn_pool.go` + +The store implementation lives in `pkg/reachability/`, and is initialized at +process start in `supernode/cmd/start.go`. + +## Active probing (traffic generator) + +Active probing generates a small amount of inbound traffic so that evidence can +be refreshed even on low-traffic nodes. + +### Participation / targets + +- Probing runs on **ACTIVE** supernodes. +- Targets are **ACTIVE** supernodes with a **routable IPv4** address from chain + registration. +- Private/loopback targets are only allowed when `INTEGRATION_TEST=true`. + +### Epoch scheduling (deterministic) + +To make “silence means closed” defensible, probe assignments are deterministic. + +Inputs: + +- `epoch_blocks`: derived from the chain param `metrics_update_interval_blocks` +- `epoch_id = floor(current_height / epoch_blocks)` +- Eligible set `L`: ACTIVE supernodes with routable IPv4, sorted by + `supernode_account` +- `N = len(L)` +- `K = ProbeAssignmentsPerEpoch` (default 3) + +Offsets: + +- `K` distinct offsets are derived from a stable hash of `(epoch_id, j)` and + mapped into `[1..N-1]`. + +Assignments: + +- For a node at index `i` in `L`, its outbound targets are: + - `target_index = (i + offset_j) mod N` for `j in [0..K-1]` + +This yields **K distinct probers per target per epoch**. + +Implementation: + +- Scheduling and probing loop: `supernode/supernode_metrics/active_probing.go` + +### Probe execution per target + +Each target is probed on all three services (with strict timeouts): + +1. **gRPC**: ALTS connection + gRPC health `Check` must be `SERVING`. +2. **Gateway**: `GET http://:8002/api/v1/status` must be 2xx. +3. **P2P**: TCP dial + Lumera secure handshake must succeed. + +The probing loop uses jitter to avoid synchronized spikes. +It also spaces probes across the epoch by waiting roughly `reportInterval / K` +between outbound probes (with jitter). + +## Port state derivation (`open_ports`) + +For each local service port: + +1. `OPEN` if inbound evidence is fresh within the evidence window. +2. `CLOSED` if evidence is not fresh **and** the quorum rule is satisfied. +3. `UNKNOWN` otherwise. + +### Evidence window + +The freshness window is: + +- `max(EvidenceWindowSeconds, 2 * reportInterval)` + +This prevents evidence from expiring between expected metrics reports. + +## Quorum rule (when silence implies CLOSED) + +For node `X` in epoch `E`: + +- `AssignedProbers(E, X)` is the set of `K` peers that should probe `X` under + the deterministic schedule. +- A prober is “alive” if its on-chain `SuperNode.Metrics.Height` is: + - within the current epoch, and + - within `metrics_freshness_max_blocks` of the current height. + +If `alive_assigned_probers(E, X) >= ProbeQuorum` (default 2) and there is no +fresh inbound evidence for a service, that service is reported as `CLOSED`. + +Implementation: + +- Quorum calculation: `supernode/supernode_metrics/reachability_quorum.go` + +## Operational behavior + +### When ports become OPEN + +A port becomes `OPEN` only after the node observes inbound IPv4 traffic for the +corresponding service: + +- gRPC: any inbound RPC (including health checks) +- Gateway: a successful (2xx) request to `/api/v1/status` +- P2P: a successful secure handshake on the P2P socket + +In low-traffic environments, `OPEN` typically comes from the active probing +layer, which generates a small amount of deterministic inbound traffic. + +### When ports become CLOSED + +A port becomes `CLOSED` only when: + +- there is no fresh inbound evidence in the evidence window, and +- `alive_assigned_probers >= ProbeQuorum` for the current epoch + +If quorum cannot be established (e.g. not enough eligible peers, chain is +unreachable, or peers are not reporting metrics), the port remains `UNKNOWN` +instead of being inferred closed. + +## Debugging + +- Active probing logs are emitted at `DEBUG` level with prefix `Active probing: ...`. +- Metrics reporting logs include `open_ports` in `supernode/supernode_metrics/monitor_service.go`. + +## Configuration knobs + +Defaults are compile-time constants in `supernode/supernode_metrics/constants.go`: + +### Core constants + +- `ProbeTimeoutSeconds` (default 5s) + - Upper bound for a single outbound probe to a peer (gRPC dial + health check, HTTP status call, P2P dial + handshake). + - If your network has high latency or slow handshakes, increase this; keep it small to avoid hanging goroutines. +- `ProbeAssignmentsPerEpoch` (default 3) + - “How many distinct peers should probe each node per epoch” (K in the spec). + - Increasing this increases inbound traffic and improves confidence, especially in flaky networks. +- `ProbeQuorum` (default 2) + - “How many of the assigned probers must be alive” before we allow inferring `CLOSED` from silence. + - Must satisfy `ProbeAssignmentsPerEpoch >= ProbeQuorum`. +- `MinProbeAttemptsPerReportInterval` (default 3) + - Minimum number of outbound *peer probes* attempted per metrics reporting interval. + - If the deterministic schedule yields fewer outbound targets than this (e.g. very small networks), probing will wrap and probe targets multiple times per interval to ensure at least this many attempts. +- `EvidenceWindowSeconds` (default 600s) + - Minimum freshness window for inbound evidence. + - The effective window used for `OPEN` is: `max(EvidenceWindowSeconds, 2 * reportInterval)`. + +### Ports + +- `APIPort` (default 4444), `P2PPort` (default 4445), `StatusPort` (default 8002) + - Defaults used in `open_ports` reporting; should stay aligned with the chain `required_open_ports` param. + +Chain-derived inputs: + +- `metrics_update_interval_blocks` (used for `epoch_blocks` and `reportInterval`) +- `metrics_freshness_max_blocks` (used for prober “alive” checks) + +## Tuning examples + +### Example A: report interval = 30s + +- `MinProbeAttemptsPerReportInterval=3` means each node attempts at least 3 peer probes per 30s interval. + - If `ProbeAssignmentsPerEpoch=3`, the normal behavior is ~1 probe every 10s (plus jitter). + - If the eligible set is tiny and yields only 1 outbound target, the node still probes ~every 10s, wrapping to the same target. +- Evidence freshness window becomes: `max(600s, 2*30s) = 600s`. + +### Example B: report interval = 5m + +- With `ProbeAssignmentsPerEpoch=3`, a node probes ~once every `5m/3 ≈ 100s` (plus jitter). +- Evidence freshness window becomes: `max(600s, 2*5m) = 10m`. + +### Example C: “honest-but-flaky” network + +Goal: reduce false `CLOSED` caused by missed probe attempts. + +- Consider increasing to `ProbeAssignmentsPerEpoch=5` and `ProbeQuorum=3`. +- Consider increasing `EvidenceWindowSeconds` if brief outages are common, so nodes don’t lose `OPEN` too quickly. + +## Testing + +- Unit tests: `go test ./supernode/supernode_metrics` +- Integration environments can allow probing private IPv4 targets by setting: + - `INTEGRATION_TEST=true` diff --git a/go.mod b/go.mod index 6d1cd2c7..adff0836 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( cosmossdk.io/math v1.5.3 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/DataDog/zstd v1.5.7 - github.com/LumeraProtocol/lumera v1.8.6-rc2 + github.com/LumeraProtocol/lumera v1.9.0 github.com/LumeraProtocol/rq-go v0.2.1 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cenkalti/backoff/v4 v4.3.0 @@ -35,11 +35,11 @@ require ( go.uber.org/mock v0.6.0 go.uber.org/ratelimit v0.3.1 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.42.0 + golang.org/x/crypto v0.43.0 golang.org/x/sync v0.17.0 - golang.org/x/sys v0.36.0 - google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 - google.golang.org/grpc v1.76.0 + golang.org/x/sys v0.37.0 + google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 + google.golang.org/grpc v1.77.0 google.golang.org/protobuf v1.36.10 gopkg.in/yaml.v3 v3.0.1 lukechampine.com/blake3 v1.4.1 @@ -155,7 +155,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.22.0 // indirect - github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.63.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect @@ -188,11 +188,12 @@ require ( golang.org/x/arch v0.15.0 // indirect golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b // indirect golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d // indirect - golang.org/x/net v0.44.0 // indirect - golang.org/x/term v0.35.0 // indirect - golang.org/x/text v0.29.0 // indirect + golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 // indirect + golang.org/x/term v0.36.0 // indirect + golang.org/x/text v0.30.0 // indirect google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect gotest.tools/v3 v3.5.2 // indirect lukechampine.com/uint128 v1.3.0 // indirect nhooyr.io/websocket v1.8.17 // indirect diff --git a/go.sum b/go.sum index 3b16ff80..c32758bd 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,8 @@ cloud.google.com/go/auth v0.16.0/go.mod h1:1howDHJ5IETh/LwYs3ZxvlkXF48aSqqJUM+5o cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc= cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c= cloud.google.com/go/compute v1.37.0 h1:XxtZlXYkZXub3LNaLu90TTemcFqIU1yZ4E4q9VlR39A= -cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU= -cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo= +cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= +cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= cloud.google.com/go/iam v1.5.2 h1:qgFRAGEmd8z6dJ/qyEchAuL9jpswyODjA2lS+w234g8= cloud.google.com/go/iam v1.5.2/go.mod h1:SE1vg0N81zQqLzQEwxL2WI6yhetBdbNQuTvIKCSkUHE= cloud.google.com/go/monitoring v1.24.2 h1:5OTsoJ1dXYIiMiuL+sYscLc9BumrL3CarVLL7dd7lHM= @@ -69,15 +69,15 @@ github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bp github.com/DataDog/datadog-go v4.8.3+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= -github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 h1:UQUsRi8WTzhZntp5313l+CHIAT95ojUI2lpP/ExlZa4= -github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0/go.mod h1:Cz6ft6Dkn3Et6l2v2a9/RpN7epQ1GtDlO6lj8bEcOvw= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 h1:sBEjpZlNHzK1voKq9695PJSX2o5NEXl7/OL3coiIY0c= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50.0 h1:5IT7xOdq17MtcdtL/vtl6mGfzhaq4m4vpollPRmlsBQ= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50.0/go.mod h1:ZV4VOm0/eHR06JLrXWe09068dHpr3TRpY9Uo7T+anuA= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/LumeraProtocol/lumera v1.8.6-rc2 h1:o4f3HOpmpk6VU+PiFOExx6F6doffLCKJUcDQzQ59TbE= -github.com/LumeraProtocol/lumera v1.8.6-rc2/go.mod h1:DcG+PermGhl5uA51VaSA0EC+FXpDVm2XgifmYL9jJvE= +github.com/LumeraProtocol/lumera v1.9.0 h1:84IdR+8U8R8JeT/wGhNsg60JuzhZkfYkOO4OvfVISAo= +github.com/LumeraProtocol/lumera v1.9.0/go.mod h1:sq4A8DTNOTVxukLi2ydYg9z8tfv3Uq0Pn1tBCgtAy1U= github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4= github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8= github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4= @@ -179,8 +179,8 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls= -github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f h1:Y8xYupdHxryycyPlc9Y+bSQAYZnetRJ70VMVKm5CKI0= +github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f/go.mod h1:HlzOvOjVBOfTGSRXRyY0OiCS/3J1akRGQQpRO/7zyF4= github.com/cockroachdb/apd/v2 v2.0.2 h1:weh8u7Cneje73dDh+2tEVLUvyBc89iwepWCD8b8034E= github.com/cockroachdb/apd/v2 v2.0.2/go.mod h1:DDxRlzC2lo3/vSlmSoS7JkqbbrARPuFOGr0B9pvN3Gw= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= @@ -291,9 +291,9 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= -github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M= -github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A= -github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw= +github.com/envoyproxy/go-control-plane v0.13.5-0.20251024222203-75eaa193e329 h1:K+fnvUM0VZ7ZFJf0n4L/BRlnsb9pL/GuDG6FqaH+PwM= +github.com/envoyproxy/go-control-plane/envoy v1.35.0 h1:ixjkELDE+ru6idPxcHLj8LBVc2bFP7iBytj353BoHUo= +github.com/envoyproxy/go-control-plane/envoy v1.35.0/go.mod h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= @@ -324,8 +324,8 @@ github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwv github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI= -github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo= +github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= +github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o= @@ -745,8 +745,8 @@ github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1: github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= -github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= @@ -824,8 +824,8 @@ github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= -github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE= -github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g= +github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo= +github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= @@ -876,8 +876,6 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM= -github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/zondax/hid v0.9.2 h1:WCJFnEDMiqGF64nlZz28E9qLVZ0KSJ7xpc5DLEyma2U= github.com/zondax/hid v0.9.2/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM= github.com/zondax/ledger-go v0.14.3 h1:wEpJt2CEcBJ428md/5MgSLsXLBos98sBOyxNmCjfUCw= @@ -893,8 +891,8 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/contrib/detectors/gcp v1.36.0 h1:F7q2tNlCaHY9nMKHR6XH9/qkp8FktLnIcy6jJNyOCQw= -go.opentelemetry.io/contrib/detectors/gcp v1.36.0/go.mod h1:IbBN8uAIIx734PTonTPxAxnjc2pQTxWNkwfstZ+6H2k= +go.opentelemetry.io/contrib/detectors/gcp v1.38.0 h1:ZoYbqX7OaA/TAikspPl3ozPI6iY6LiIY9I8cUfm+pJs= +go.opentelemetry.io/contrib/detectors/gcp v1.38.0/go.mod h1:SU+iU7nu5ud4oCb3LQOhIZ3nRLj6FNVrKgtflbaf2ts= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 h1:x7wzEgXfnzJcHDwStJT+mxOz4etr2EcexjqhBvmoakw= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0/go.mod h1:rg+RlpR5dKwaS95IyyZqj5Wd4E13lk/msnTS0Xl9lJM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 h1:RbKq8BG0FI8OiXhBfcRtqqHcZcka+gU3cskNuf05R18= @@ -949,8 +947,8 @@ golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= -golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= +golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= @@ -1004,13 +1002,13 @@ golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= -golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 h1:6/3JGEh1C88g7m+qzzTbl3A0FtsLguXieqofVLU/JAo= +golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= -golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY= +golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1083,12 +1081,12 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.35.0 h1:bZBVKBudEyhRcajGcNc3jIfWPqV4y/Kt2XcoigOWtDQ= -golang.org/x/term v0.35.0/go.mod h1:TPGtkTLesOwf2DE8CgVYiZinHAOuy5AYUYT1lENIZnA= +golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q= +golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -1096,8 +1094,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= -golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= +golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= @@ -1150,10 +1148,10 @@ google.golang.org/genproto v0.0.0-20210126160654-44e461bb6506/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20220314164441-57ef72a4c106/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E= google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 h1:1tXaIXCracvtsRxSBsYDiSBN0cuJvM7QYW+MrpIRY78= google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:49MsLSx0oWMOZqcpB3uL8ZOkAh1+TndpJ8ONoCBWiZk= -google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 h1:8XJ4pajGwOlasW+L13MnEGA8W4115jJySQtVfS2/IBU= -google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4/go.mod h1:NnuHhy+bxcg30o7FnVAZbXsPHUDQ9qKWAQKCD7VxFtk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 h1:i8QOKZfYg6AbGVZzUAY3LrNWCKF8O6zFisU9Wl9RER4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= +google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 h1:mepRgnBZa07I4TRuomDE4sTIYieg/osKmzIf4USdWS4= +google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= @@ -1172,8 +1170,8 @@ google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= -google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= -google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/p2p/kademlia/conn_pool.go b/p2p/kademlia/conn_pool.go index 66b33e28..2139d535 100644 --- a/p2p/kademlia/conn_pool.go +++ b/p2p/kademlia/conn_pool.go @@ -11,6 +11,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/errors" ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials" + althandshake "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials/alts/handshake" "google.golang.org/grpc/credentials" ) @@ -100,8 +101,8 @@ func NewSecureClientConn(ctx context.Context, tc credentials.TransportCredential }, nil } -// NewSecureServerConn do server handshake and create a secure connection -func NewSecureServerConn(_ context.Context, tc credentials.TransportCredentials, rawConn net.Conn) (net.Conn, error) { +// NewSecureServerConn does server handshake and returns a secure connection along with the authenticated remote identity (when available). +func NewSecureServerConn(_ context.Context, tc credentials.TransportCredentials, rawConn net.Conn) (net.Conn, string, error) { if tcp, ok := rawConn.(*net.TCPConn); ok { _ = tcp.SetKeepAlive(true) _ = tcp.SetKeepAlivePeriod(2 * time.Minute) // tune: 2–5 min @@ -111,15 +112,20 @@ func NewSecureServerConn(_ context.Context, tc credentials.TransportCredentials, _ = tcp.SetNoDelay(true) } - conn, _, err := tc.ServerHandshake(rawConn) + conn, authInfo, err := tc.ServerHandshake(rawConn) if err != nil { - return nil, errors.Errorf("server secure establish failed: %w", err) + return nil, "", errors.Errorf("server secure establish failed: %w", err) + } + + remoteIdentity := "" + if ai, ok := authInfo.(*althandshake.AuthInfo); ok && ai != nil { + remoteIdentity = ai.RemoteIdentity } return &connWrapper{ secureConn: conn, rawConn: rawConn, - }, nil + }, remoteIdentity, nil } // Read implements net.Conn's Read interface diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index a5ae39ee..48f8ffc9 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -15,6 +15,7 @@ import ( "github.com/btcsuite/btcutil/base58" json "github.com/json-iterator/go" + "github.com/LumeraProtocol/supernode/v2/pkg/reachability" "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/google/uuid" @@ -360,7 +361,8 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { }) // secure handshake if s.serverTC != nil { - conn, err = NewSecureServerConn(ctx, s.serverTC, rawConn) + var remoteIdentity string + conn, remoteIdentity, err = NewSecureServerConn(ctx, s.serverTC, rawConn) if err != nil { _ = rawConn.Close() logtrace.Warn(ctx, "Server secure handshake failed", logtrace.Fields{ @@ -369,6 +371,11 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { }) return } + + // Record inbound evidence for P2P reachability only after the secure handshake succeeds. + if store := reachability.DefaultStore(); store != nil { + store.RecordInbound(reachability.ServiceP2P, remoteIdentity, rawConn.RemoteAddr(), time.Now()) + } } else { conn = rawConn } diff --git a/pkg/net/grpc/server/server.go b/pkg/net/grpc/server/server.go index ae1a3524..08f85262 100644 --- a/pkg/net/grpc/server/server.go +++ b/pkg/net/grpc/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/errors" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + "github.com/LumeraProtocol/supernode/v2/pkg/reachability" ) const ( @@ -112,6 +113,15 @@ func DefaultServerOptions() *ServerOptions { } } +func recordInboundReachability(ctx context.Context) { + store := reachability.DefaultStore() + if store == nil { + return + } + identity, addr := reachability.GrpcRemoteIdentityAndAddr(ctx) + store.RecordInbound(reachability.ServiceGRPC, identity, addr, time.Now()) +} + // defaultServerOptionBuilder is the default implementation of ServerOptionBuilder type defaultServerOptionBuilder struct{} @@ -160,6 +170,19 @@ func NewServerWithBuilder(name string, creds credentials.TransportCredentials, b // buildServerOptions creates all server options including credentials func (s *Server) buildServerOptions(opts *ServerOptions) []grpc.ServerOption { + unaryInterceptors := []grpc.UnaryServerInterceptor{ + func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + recordInboundReachability(ctx) + return handler(ctx, req) + }, + } + streamInterceptors := []grpc.StreamServerInterceptor{ + func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + recordInboundReachability(ss.Context()) + return handler(srv, ss) + }, + } + serverOpts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(opts.MaxRecvMsgSize), grpc.MaxSendMsgSize(opts.MaxSendMsgSize), @@ -170,6 +193,8 @@ func (s *Server) buildServerOptions(opts *ServerOptions) []grpc.ServerOption { grpc.KeepaliveEnforcementPolicy(s.builder.buildKeepAlivePolicy(opts)), grpc.WriteBufferSize(opts.WriteBufferSize), grpc.ReadBufferSize(opts.ReadBufferSize), + grpc.ChainUnaryInterceptor(unaryInterceptors...), + grpc.ChainStreamInterceptor(streamInterceptors...), } if opts.NumServerWorkers > 0 { diff --git a/pkg/reachability/grpc.go b/pkg/reachability/grpc.go new file mode 100644 index 00000000..d097ad1c --- /dev/null +++ b/pkg/reachability/grpc.go @@ -0,0 +1,25 @@ +package reachability + +import ( + "context" + "net" + + "google.golang.org/grpc/peer" + + althandshake "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials/alts/handshake" +) + +// GrpcRemoteIdentityAndAddr extracts the remote identity (if using Lumera ALTS) +// and the remote network address from an inbound gRPC context. +func GrpcRemoteIdentityAndAddr(ctx context.Context) (identity string, addr net.Addr) { + p, ok := peer.FromContext(ctx) + if !ok || p == nil { + return "", nil + } + if p.AuthInfo != nil { + if ai, ok := p.AuthInfo.(*althandshake.AuthInfo); ok { + identity = ai.RemoteIdentity + } + } + return identity, p.Addr +} diff --git a/pkg/reachability/store.go b/pkg/reachability/store.go new file mode 100644 index 00000000..01c35717 --- /dev/null +++ b/pkg/reachability/store.go @@ -0,0 +1,118 @@ +package reachability + +import ( + "net" + "sync" + "sync/atomic" + "time" +) + +var defaultStore atomic.Value // *Store + +// SetDefaultStore sets the process-wide default reachability store. +// Components that don't have explicit dependency injection can use DefaultStore(). +func SetDefaultStore(store *Store) { + defaultStore.Store(store) +} + +// DefaultStore returns the process-wide default reachability store, if set. +func DefaultStore() *Store { + if v := defaultStore.Load(); v != nil { + if s, ok := v.(*Store); ok { + return s + } + } + return nil +} + +// Store records last-seen inbound evidence per Service. +// This is used to infer external reachability based on real inbound traffic. +type Store struct { + mu sync.RWMutex + + selfIdentity string + + lastInbound map[Service]time.Time +} + +func NewStore(selfIdentity string) *Store { + return &Store{ + selfIdentity: selfIdentity, + lastInbound: make(map[Service]time.Time, 3), + } +} + +// RecordInbound stores an inbound evidence timestamp for a service. +// +// The caller should pass the remote identity when available (e.g. via ALTS auth info), +// and the remote address (for loopback filtering). +func (s *Store) RecordInbound(service Service, remoteIdentity string, remoteAddr net.Addr, at time.Time) { + if service == "" { + return + } + // We only treat this as "external inbound evidence" if the caller can + // provide an actual remote network address. + if remoteAddr == nil { + return + } + // The system is IPv4-only; ignore IPv6 / non-IP addresses. + if !isIPv4Addr(remoteAddr) { + return + } + if isLoopbackAddr(remoteAddr) { + return + } + if remoteIdentity != "" && s.selfIdentity != "" && remoteIdentity == s.selfIdentity { + return + } + + s.mu.Lock() + s.lastInbound[service] = at + s.mu.Unlock() +} + +func (s *Store) LastInbound(service Service) time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + return s.lastInbound[service] +} + +func (s *Store) IsInboundFresh(service Service, window time.Duration, now time.Time) bool { + if window <= 0 { + return false + } + last := s.LastInbound(service) + if last.IsZero() { + return false + } + return now.Sub(last) <= window +} + +func isLoopbackAddr(addr net.Addr) bool { + if addr == nil { + return false + } + switch a := addr.(type) { + case *net.TCPAddr: + return a.IP != nil && a.IP.IsLoopback() + case *net.UDPAddr: + return a.IP != nil && a.IP.IsLoopback() + default: + // Best-effort fallback; if we can't reliably parse, don't exclude. + return false + } +} + +func isIPv4Addr(addr net.Addr) bool { + if addr == nil { + return false + } + switch a := addr.(type) { + case *net.TCPAddr: + return a.IP != nil && a.IP.To4() != nil + case *net.UDPAddr: + return a.IP != nil && a.IP.To4() != nil + default: + return false + } +} diff --git a/pkg/reachability/store_test.go b/pkg/reachability/store_test.go new file mode 100644 index 00000000..d88c2d2e --- /dev/null +++ b/pkg/reachability/store_test.go @@ -0,0 +1,118 @@ +package reachability + +import ( + "net" + "testing" + "time" +) + +func TestStoreRecordInboundFilters(t *testing.T) { + now := time.Unix(1234, 0) + ipv4 := net.ParseIP("203.0.113.10") + ipv6 := net.ParseIP("2001:db8::1") + + cases := []struct { + name string + service Service + selfIdentity string + remoteID string + addr net.Addr + wantRecorded bool + }{ + { + name: "empty service", + service: "", + selfIdentity: "me", + remoteID: "peer", + addr: &net.TCPAddr{IP: ipv4, Port: 123}, + wantRecorded: false, + }, + { + name: "nil remote addr", + service: ServiceGRPC, + selfIdentity: "me", + remoteID: "peer", + addr: nil, + wantRecorded: false, + }, + { + name: "ipv6 ignored", + service: ServiceGRPC, + selfIdentity: "me", + remoteID: "peer", + addr: &net.TCPAddr{IP: ipv6, Port: 123}, + wantRecorded: false, + }, + { + name: "loopback ignored", + service: ServiceGRPC, + selfIdentity: "me", + remoteID: "peer", + addr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 123}, + wantRecorded: false, + }, + { + name: "self identity ignored", + service: ServiceGRPC, + selfIdentity: "me", + remoteID: "me", + addr: &net.TCPAddr{IP: ipv4, Port: 123}, + wantRecorded: false, + }, + { + name: "ipv4 tcp accepted", + service: ServiceGRPC, + selfIdentity: "me", + remoteID: "peer", + addr: &net.TCPAddr{IP: ipv4, Port: 123}, + wantRecorded: true, + }, + { + name: "ipv4 udp accepted", + service: ServiceGateway, + selfIdentity: "me", + remoteID: "", + addr: &net.UDPAddr{IP: ipv4, Port: 123}, + wantRecorded: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + store := NewStore(tc.selfIdentity) + store.RecordInbound(tc.service, tc.remoteID, tc.addr, now) + + got := store.LastInbound(tc.service) + if tc.wantRecorded { + if got != now { + t.Fatalf("expected last inbound=%v, got %v", now, got) + } + return + } + if !got.IsZero() { + t.Fatalf("expected no record, got %v", got) + } + }) + } +} + +func TestStoreIsInboundFresh(t *testing.T) { + store := NewStore("me") + now := time.Unix(2000, 0) + + if store.IsInboundFresh(ServiceGRPC, time.Second, now) { + t.Fatalf("expected not fresh with no evidence") + } + if store.IsInboundFresh(ServiceGRPC, 0, now) { + t.Fatalf("expected not fresh with non-positive window") + } + + store.RecordInbound(ServiceGRPC, "peer", &net.TCPAddr{IP: net.ParseIP("203.0.113.10"), Port: 123}, now) + + if !store.IsInboundFresh(ServiceGRPC, time.Second, now.Add(500*time.Millisecond)) { + t.Fatalf("expected fresh within window") + } + if store.IsInboundFresh(ServiceGRPC, time.Second, now.Add(2*time.Second)) { + t.Fatalf("expected not fresh outside window") + } +} diff --git a/pkg/reachability/types.go b/pkg/reachability/types.go new file mode 100644 index 00000000..e85fd170 --- /dev/null +++ b/pkg/reachability/types.go @@ -0,0 +1,12 @@ +package reachability + +// Service identifies which externally-reachable service/port we are tracking. +// It is intentionally coarse-grained (per service), because `open_ports` is +// derived from the configured ports for each service. +type Service string + +const ( + ServiceGRPC Service = "grpc" + ServiceP2P Service = "p2p" + ServiceGateway Service = "gateway" +) diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index 369aeebe..bb257c43 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -17,6 +17,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/lumera" grpcserver "github.com/LumeraProtocol/supernode/v2/pkg/net/grpc/server" + "github.com/LumeraProtocol/supernode/v2/pkg/reachability" "github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore" "github.com/LumeraProtocol/supernode/v2/pkg/task" cascadeService "github.com/LumeraProtocol/supernode/v2/supernode/cascade" @@ -78,6 +79,9 @@ The supernode will connect to the Lumera network and begin participating in the logtrace.Fatal(ctx, "Failed to connect Lumera, please check your configuration", logtrace.Fields{"error": err.Error()}) } + // Reachability evidence store (used for open_ports inference). + reachability.SetDefaultStore(reachability.NewStore(appConfig.SupernodeConfig.Identity)) + // Verify config matches chain registration before starting services logtrace.Debug(ctx, "Verifying configuration against chain registration", logtrace.Fields{}) configVerifier := verifier.NewConfigVerifier(appConfig, lumeraClient, kr) diff --git a/supernode/supernode_metrics/active_probing.go b/supernode/supernode_metrics/active_probing.go new file mode 100644 index 00000000..da07c79e --- /dev/null +++ b/supernode/supernode_metrics/active_probing.go @@ -0,0 +1,604 @@ +package supernode_metrics + +import ( + "context" + "crypto/sha256" + "encoding/binary" + "fmt" + "math/rand" + "net" + "net/http" + "net/url" + "os" + "sort" + "strconv" + "strings" + "time" + + "github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx" + sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera" + ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials" + grpcclient "github.com/LumeraProtocol/supernode/v2/pkg/net/grpc/client" +) + +type probeTarget struct { + identity string + hostIPv4 string + grpcPort int + p2pPort int + gatewayPort int + + metricsHeight int64 +} + +func (hm *Collector) probingLoop(ctx context.Context) { + defer hm.wg.Done() + + selfIdentity, ok := hm.probingIdentity() + if !ok { + logtrace.Warn(ctx, "Active probing disabled: missing identity/keyring/client", nil) + return + } + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + if !hm.waitForProbeStart(ctx, rng) { + return + } + + probeTimeout := time.Duration(ProbeTimeoutSeconds) * time.Second + clients, err := hm.newProbeClients(selfIdentity, probeTimeout) + if err != nil { + logtrace.Error(ctx, "Active probing disabled: failed to initialize probe clients", logtrace.Fields{logtrace.FieldError: err.Error()}) + return + } + + cache := &probeCache{} + for { + hm.refreshProbePlanIfNeeded(ctx, cache, selfIdentity) + hm.runProbeRound(ctx, clients, cache, probeTimeout) + + if !hm.waitForNextProbe(ctx, rng, cache) { + return + } + } +} + +type probeCache struct { + plan *probePlan + cursor int +} + +type probePlan struct { + epochID uint64 + epochBlocks uint64 + eligiblePeers []probeTarget + outboundTargets []probeTarget + builtAt time.Time +} + +type probeClients struct { + grpcClient *grpcclient.Client + grpcOpts *grpcclient.ClientOptions + p2pCreds *ltc.LumeraTC + httpClient *http.Client +} + +func (hm *Collector) probingIdentity() (string, bool) { + selfIdentity := strings.TrimSpace(hm.identity) + if selfIdentity == "" || hm.keyring == nil || hm.lumeraClient == nil { + return "", false + } + return selfIdentity, true +} + +func (hm *Collector) waitForProbeStart(ctx context.Context, rng *rand.Rand) bool { + startupDelay := time.Duration(DefaultStartupDelaySeconds) * time.Second + // Add jitter so that not all nodes start probing at the same wall-clock time. + startupDelay = withJitter(startupDelay, defaultProbeJitterFraction, rng) + return waitOrStop(ctx, hm.stopChan, startupDelay) +} + +func (hm *Collector) waitForNextProbe(ctx context.Context, rng *rand.Rand, cache *probeCache) bool { + base := defaultProbeInterval + if hm != nil && hm.reportInterval > 0 && cache != nil && cache.plan != nil && len(cache.plan.outboundTargets) > 0 { + base = probeRoundInterval(hm.reportInterval, len(cache.plan.outboundTargets)) + } + if base < time.Second { + base = time.Second + } + next := withJitter(base, defaultProbeJitterFraction, rng) + return waitOrStop(ctx, hm.stopChan, next) +} + +func probeRoundInterval(reportInterval time.Duration, outboundTargets int) time.Duration { + if reportInterval <= 0 || outboundTargets <= 0 { + return 0 + } + + attempts := outboundTargets + if attempts < MinProbeAttemptsPerReportInterval { + attempts = MinProbeAttemptsPerReportInterval + } + return reportInterval / time.Duration(attempts) +} + +func (hm *Collector) newProbeClients(selfIdentity string, timeout time.Duration) (*probeClients, error) { + validator := lumera.NewSecureKeyExchangeValidator(hm.lumeraClient) + + grpcCreds, err := ltc.NewClientCreds(<c.ClientOptions{ + CommonOptions: ltc.CommonOptions{ + Keyring: hm.keyring, + LocalIdentity: selfIdentity, + PeerType: securekeyx.Supernode, + Validator: validator, + }, + }) + if err != nil { + return nil, fmt.Errorf("create gRPC client creds: %w", err) + } + + grpcProbeClient := grpcclient.NewClient(grpcCreds) + grpcProbeOpts := grpcclient.DefaultClientOptions() + grpcProbeOpts.EnableRetries = false + grpcProbeOpts.ConnWaitTime = timeout + grpcProbeOpts.MinConnectTimeout = timeout + + p2pCreds, err := ltc.NewClientCreds(<c.ClientOptions{ + CommonOptions: ltc.CommonOptions{ + Keyring: hm.keyring, + LocalIdentity: selfIdentity, + PeerType: securekeyx.Supernode, + Validator: validator, + }, + }) + if err != nil { + return nil, fmt.Errorf("create P2P client creds: %w", err) + } + + lumeraTC, ok := p2pCreds.(*ltc.LumeraTC) + if !ok { + return nil, fmt.Errorf("invalid P2P creds type (expected *LumeraTC)") + } + + return &probeClients{ + grpcClient: grpcProbeClient, + grpcOpts: grpcProbeOpts, + p2pCreds: lumeraTC, + httpClient: &http.Client{Timeout: timeout}, + }, nil +} + +func (hm *Collector) refreshProbePlanIfNeeded(ctx context.Context, cache *probeCache, selfIdentity string) { + if cache == nil { + return + } + + selfIdentity = strings.TrimSpace(selfIdentity) + if selfIdentity == "" || hm.lumeraClient == nil { + cache.plan = nil + return + } + + height, heightOK := hm.latestBlockHeight(ctx) + now := time.Now() + if !heightOK { + // When the chain is temporarily unreachable, keep using the existing plan + // (if any) so we continue generating inbound traffic for peers. + if cache.plan != nil && now.Sub(cache.plan.builtAt) < defaultPeerRefreshInterval { + return + } + cache.plan = nil + return + } + + epochBlocks := hm.probingEpochBlocks() + if epochBlocks == 0 { + cache.plan = nil + return + } + epochID := uint64(height) / epochBlocks + if cache.plan != nil && cache.plan.epochID == epochID && cache.plan.epochBlocks == epochBlocks { + return + } + + resp, err := hm.lumeraClient.SuperNode().ListSuperNodes(ctx) + if err != nil || resp == nil { + logtrace.Warn(ctx, "Active probing: failed to refresh peer list", logtrace.Fields{logtrace.FieldError: fmt.Sprintf("%v", err)}) + return + } + + isTest := strings.EqualFold(strings.TrimSpace(os.Getenv("INTEGRATION_TEST")), "true") + + peers := buildEligiblePeers(resp.Supernodes, isTest) + if len(peers) < 2 { + cache.plan = nil + return + } + sort.Slice(peers, func(i, j int) bool { + return peers[i].identity < peers[j].identity + }) + + selfIndex := -1 + for i := range peers { + if peers[i].identity == selfIdentity { + selfIndex = i + break + } + } + if selfIndex < 0 { + // Only ACTIVE nodes participate in probing. + cache.plan = nil + return + } + + offsets := deriveProbeOffsets(epochID, ProbeAssignmentsPerEpoch, len(peers)) + outbound := make([]probeTarget, 0, len(offsets)) + for _, off := range offsets { + idx := (selfIndex + off) % len(peers) + outbound = append(outbound, peers[idx]) + } + + cache.plan = &probePlan{ + epochID: epochID, + epochBlocks: epochBlocks, + eligiblePeers: peers, + outboundTargets: outbound, + builtAt: now, + } + hm.setProbePlan(cache.plan) + cache.cursor = 0 + logtrace.Debug(ctx, "Active probing: refreshed epoch plan", logtrace.Fields{ + "epoch": epochID, + "peers": len(peers), + "targets": len(outbound), + "identity": selfIdentity, + }) +} + +func (hm *Collector) runProbeRound(ctx context.Context, clients *probeClients, cache *probeCache, timeout time.Duration) { + if clients == nil || cache == nil || cache.plan == nil || len(cache.plan.outboundTargets) == 0 { + return + } + + target := cache.plan.outboundTargets[cache.cursor%len(cache.plan.outboundTargets)] + cache.cursor++ + hm.probePeerOnce(ctx, clients.grpcClient, clients.grpcOpts, clients.p2pCreds, clients.httpClient, target, timeout) +} + +func (hm *Collector) latestBlockHeight(ctx context.Context) (int64, bool) { + if hm.lumeraClient == nil || hm.lumeraClient.Node() == nil { + return 0, false + } + resp, err := hm.lumeraClient.Node().GetLatestBlock(ctx) + if err != nil || resp == nil || resp.Block == nil { + return 0, false + } + return resp.Block.Header.Height, true +} + +func buildEligiblePeers(supernodes []*sntypes.SuperNode, integrationTest bool) []probeTarget { + out := make([]probeTarget, 0, len(supernodes)) + for _, sn := range supernodes { + if sn == nil { + continue + } + + peerID := strings.TrimSpace(sn.GetSupernodeAccount()) + if peerID == "" { + continue + } + + state := latestState(sn) + if state != sntypes.SuperNodeStateActive { + continue + } + + latestAddr := latestIPAddress(sn) + host, grpcPort, ok := parseHostAndPort(latestAddr, APIPort) + if !ok { + continue + } + + ip := net.ParseIP(host) + if ip == nil || ip.To4() == nil { + continue // IPv4-only + } + ip4 := ip.To4() + if !isAllowedProbeIPv4(ip4, integrationTest) { + continue + } + + p2pPort := P2PPort + if p := strings.TrimSpace(sn.GetP2PPort()); p != "" { + if n, err := strconv.ParseUint(p, 10, 16); err == nil && n > 0 { + p2pPort = int(n) + } + } + + var metricsHeight int64 + if m := sn.GetMetrics(); m != nil { + metricsHeight = m.GetHeight() + } + + out = append(out, probeTarget{ + identity: peerID, + hostIPv4: ip4.String(), + grpcPort: grpcPort, + p2pPort: p2pPort, + gatewayPort: StatusPort, + metricsHeight: metricsHeight, + }) + } + return out +} + +func (hm *Collector) probePeerOnce( + ctx context.Context, + grpcProbeClient *grpcclient.Client, + grpcProbeOpts *grpcclient.ClientOptions, + p2pCreds *ltc.LumeraTC, + httpClient *http.Client, + target probeTarget, + timeout time.Duration, +) { + if grpcProbeClient != nil { + if err := probeGRPC(ctx, grpcProbeClient, grpcProbeOpts, target, timeout); err != nil { + logtrace.Debug(ctx, "Active probing: gRPC probe failed", logtrace.Fields{ + "peer": target.identity, + "grpc_host": target.hostIPv4, + "grpc_port": target.grpcPort, + logtrace.FieldError: err.Error(), + }) + } + } + + if httpClient != nil { + if err := probeGateway(ctx, httpClient, target, timeout); err != nil { + logtrace.Debug(ctx, "Active probing: gateway probe failed", logtrace.Fields{ + "peer": target.identity, + "gateway_host": target.hostIPv4, + "gateway_port": target.gatewayPort, + logtrace.FieldError: err.Error(), + }) + } + } + + if p2pCreds != nil { + if err := probeP2P(ctx, p2pCreds, target, timeout); err != nil { + logtrace.Debug(ctx, "Active probing: P2P probe failed", logtrace.Fields{ + "peer": target.identity, + "p2p_host": target.hostIPv4, + "p2p_port": target.p2pPort, + logtrace.FieldError: err.Error(), + }) + } + } +} + +func probeGRPC(ctx context.Context, client *grpcclient.Client, opts *grpcclient.ClientOptions, target probeTarget, timeout time.Duration) error { + if client == nil { + return fmt.Errorf("gRPC client is nil") + } + + hostPort := net.JoinHostPort(target.hostIPv4, strconv.Itoa(target.grpcPort)) + addr := ltc.FormatAddressWithIdentity(target.identity, hostPort) + + probeCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + conn, err := client.Connect(probeCtx, addr, opts) + if err != nil { + return err + } + defer conn.Close() + + healthClient := grpc_health_v1.NewHealthClient(conn) + resp, err := healthClient.Check(probeCtx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + return err + } + if resp.GetStatus() != grpc_health_v1.HealthCheckResponse_SERVING { + return fmt.Errorf("health status=%v", resp.GetStatus()) + } + return nil +} + +func probeGateway(ctx context.Context, client *http.Client, target probeTarget, timeout time.Duration) error { + if client == nil { + return fmt.Errorf("http client is nil") + } + + urlStr := fmt.Sprintf("http://%s:%d/api/v1/status", target.hostIPv4, target.gatewayPort) + probeCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, urlStr, nil) + if err != nil { + return err + } + + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("status=%d", resp.StatusCode) + } + return nil +} + +func probeP2P(ctx context.Context, creds *ltc.LumeraTC, target probeTarget, timeout time.Duration) error { + if creds == nil { + return fmt.Errorf("p2p creds is nil") + } + creds.SetRemoteIdentity(target.identity) + + probeCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + var d net.Dialer + rawConn, err := d.DialContext(probeCtx, "tcp", net.JoinHostPort(target.hostIPv4, strconv.Itoa(target.p2pPort))) + if err != nil { + return err + } + defer rawConn.Close() + + _ = rawConn.SetDeadline(time.Now().Add(timeout)) + + secureConn, _, err := creds.ClientHandshake(probeCtx, "", rawConn) + if err != nil { + return err + } + _ = secureConn.Close() + + return nil +} + +func deriveProbeOffsets(epochID uint64, k int, n int) []int { + if k <= 0 || n <= 1 { + return nil + } + maxOffset := n - 1 + if k > maxOffset { + k = maxOffset + } + + out := make([]int, 0, k) + used := make(map[int]struct{}, k) + for j := 0; len(out) < k; j++ { + for attempt := 0; ; attempt++ { + h := sha256.Sum256([]byte(fmt.Sprintf("%d:%d:%d", epochID, j, attempt))) + v := binary.LittleEndian.Uint64(h[:8]) + offset := int(v%uint64(maxOffset)) + 1 // [1..N-1] + if _, ok := used[offset]; ok { + continue + } + used[offset] = struct{}{} + out = append(out, offset) + break + } + } + return out +} + +func withJitter(base time.Duration, fraction float64, rng *rand.Rand) time.Duration { + if base <= 0 || fraction <= 0 { + return base + } + if rng == nil { + rng = rand.New(rand.NewSource(time.Now().UnixNano())) + } + maxJitter := int64(float64(base) * fraction) + if maxJitter <= 0 { + return base + } + // Uniform in [-maxJitter, +maxJitter]. + delta := rng.Int63n(2*maxJitter+1) - maxJitter + out := base + time.Duration(delta) + if out < time.Second { + out = time.Second + } + return out +} + +func latestState(sn *sntypes.SuperNode) sntypes.SuperNodeState { + if sn == nil || len(sn.States) == 0 { + return sntypes.SuperNodeStateUnspecified + } + var ( + best *sntypes.SuperNodeStateRecord + bestHeight int64 = -1 + ) + for _, st := range sn.States { + if st == nil { + continue + } + if st.Height > bestHeight { + bestHeight = st.Height + best = st + } + } + if best == nil { + return sntypes.SuperNodeStateUnspecified + } + return best.State +} + +func latestIPAddress(sn *sntypes.SuperNode) string { + if sn == nil || len(sn.PrevIpAddresses) == 0 { + return "" + } + var ( + best *sntypes.IPAddressHistory + bestHeight int64 = -1 + ) + for _, rec := range sn.PrevIpAddresses { + if rec == nil { + continue + } + if rec.Height > bestHeight { + bestHeight = rec.Height + best = rec + } + } + if best == nil { + return "" + } + return strings.TrimSpace(best.Address) +} + +func parseHostAndPort(address string, defaultPort int) (host string, port int, ok bool) { + address = strings.TrimSpace(address) + if address == "" { + return "", 0, false + } + + // If it looks like a URL, parse and use the host[:port] portion. + if u, err := url.Parse(address); err == nil && u.Host != "" { + address = u.Host + } + + if h, p, err := net.SplitHostPort(address); err == nil { + h = strings.TrimSpace(h) + if h == "" { + return "", 0, false + } + if n, err := strconv.Atoi(p); err == nil && n > 0 && n <= 65535 { + return h, n, true + } + return h, defaultPort, true + } + + // No port present; return default. + return address, defaultPort, true +} + +func isAllowedProbeIPv4(ip net.IP, integrationTest bool) bool { + if ip == nil || ip.To4() == nil { + return false + } + if ip.IsUnspecified() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { + return false + } + if ip.IsLoopback() || ip.IsPrivate() { + return integrationTest + } + return true +} + +func waitOrStop(ctx context.Context, stopCh <-chan struct{}, d time.Duration) bool { + select { + case <-time.After(d): + return true + case <-stopCh: + return false + case <-ctx.Done(): + return false + } +} diff --git a/supernode/supernode_metrics/constants.go b/supernode/supernode_metrics/constants.go new file mode 100644 index 00000000..7119f392 --- /dev/null +++ b/supernode/supernode_metrics/constants.go @@ -0,0 +1,45 @@ +package supernode_metrics + +import "time" + +const ( + // DefaultStartupDelaySeconds is a safety delay after process start before + // we begin reporting metrics / probing, giving the node time to fully initialize. + DefaultStartupDelaySeconds = 30 + + // ProbeTimeoutSeconds bounds how long we wait when actively probing peer + // reachability, so a single slow dial cannot stall the entire loop. + ProbeTimeoutSeconds = 5 + + // MinProbeAttemptsPerReportInterval ensures we attempt at least this many outbound + // probe rounds per metrics reporting interval, even when the deterministic schedule + // would otherwise result in fewer targets (e.g. very small networks). + // + // A "probe attempt" here means probing one peer once (gRPC health, gateway status, + // and P2P handshake are attempted as part of that single peer probe). + MinProbeAttemptsPerReportInterval = 3 + + // EvidenceWindowSeconds controls how long inbound traffic evidence is treated as fresh. + // Active probing should ensure evidence is refreshed before it expires. + EvidenceWindowSeconds = 600 + + // ProbeAssignmentsPerEpoch is the number of distinct peers that should probe + // each target per epoch (K in the spec). + ProbeAssignmentsPerEpoch = 3 + // ProbeQuorum is the minimum number of alive assigned probers required to + // infer CLOSED from lack of inbound evidence. + ProbeQuorum = 2 + + // Well-known local ports used when reporting `open_ports` metrics. + // These are defaults; individual nodes may override them via config. + // They should stay aligned with the chain's `required_open_ports` parameter. + APIPort = 4444 // Supernode gRPC port + P2PPort = 4445 // Kademlia / P2P port + StatusPort = 8002 // HTTP gateway port (grpc-gateway: /api/v1/status) +) + +const ( + defaultProbeInterval = 2 * time.Minute + defaultPeerRefreshInterval = 10 * time.Minute // only used when chain height is unavailable + defaultProbeJitterFraction = 0.20 +) diff --git a/supernode/supernode_metrics/health_checks.go b/supernode/supernode_metrics/health_checks.go deleted file mode 100644 index ae202cfa..00000000 --- a/supernode/supernode_metrics/health_checks.go +++ /dev/null @@ -1,226 +0,0 @@ -package supernode_metrics - -import ( - "context" - "fmt" - "net" - "net/http" - "strings" - "time" - - "github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx" - "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" - lumeravalid "github.com/LumeraProtocol/supernode/v2/pkg/lumera" - ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials" - grpcclient "github.com/LumeraProtocol/supernode/v2/pkg/net/grpc/client" - "google.golang.org/grpc/health/grpc_health_v1" -) - -// checkP2PService performs a lightweight health check against the local P2P -// service using a self-directed DHT ping. -// -// The return value is a normalized "health score" in the range [0, 1]: -// - 1.0 indicates healthy / reachable -// - 0.0 indicates unhealthy / unreachable -// -// For now this is stubbed to always return 1.0 but is shaped for richer -// checks (latency, error rates, etc.) in the future. -func (hm *Collector) checkP2PService(ctx context.Context) float64 { - identity := strings.TrimSpace(hm.identity) - if identity == "" || hm.keyring == nil || hm.lumeraClient == nil { - logtrace.Warn(ctx, "P2P health check skipped: missing identity/keyring/client", nil) - return 0.0 - } - - host := hm.getPublicIP(ctx) - if host == "" { - host = "127.0.0.1" - } - target := net.JoinHostPort(host, fmt.Sprintf("%d", P2PPort)) - if hm.p2pPort != 0 { - target = net.JoinHostPort(host, fmt.Sprintf("%d", hm.p2pPort)) - } - - // Build client credentials using the same ALTS/securekeyx stack as external - // P2P clients. We treat the local node as a simplenode peer dialing the - // supernode (itself) by identity. - clientCreds, err := ltc.NewClientCreds(<c.ClientOptions{ - CommonOptions: ltc.CommonOptions{ - Keyring: hm.keyring, - LocalIdentity: identity, - PeerType: securekeyx.Simplenode, - Validator: lumeravalid.NewSecureKeyExchangeValidator(hm.lumeraClient), - }, - }) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("P2P health check: failed to create client credentials: %v", err), nil) - return 0.0 - } - - lumeraTC, ok := clientCreds.(*ltc.LumeraTC) - if !ok { - logtrace.Error(ctx, "P2P health check: invalid credentials type (expected *LumeraTC)", nil) - return 0.0 - } - // Remote identity is the supernode itself. - lumeraTC.SetRemoteIdentity(identity) - - checkCtx, cancel := context.WithTimeout(ctx, time.Duration(PortCheckTimeoutSeconds)*time.Second) - defer cancel() - - var d net.Dialer - rawConn, err := d.DialContext(checkCtx, "tcp", target) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("P2P health check: failed to dial %s: %v", target, err), nil) - return 0.0 - } - defer rawConn.Close() - - // Ensure the handshake cannot hang indefinitely. - _ = rawConn.SetDeadline(time.Now().UTC().Add(time.Duration(PortCheckTimeoutSeconds) * time.Second)) - - secureConn, _, err := lumeraTC.ClientHandshake(checkCtx, "", rawConn) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("P2P health check: handshake failed against %s: %v", target, err), nil) - return 0.0 - } - _ = secureConn.Close() - - // If we reach this point the node accepted a full secure handshake on the - // P2P port, which is sufficient to treat the service as healthy. - return 1.0 -} - -// checkStatusAPI performs an HTTP GET to the external /api/v1/status endpoint -// exposed by the gateway to validate that the REST API is reachable and -// functioning. -// -// Like other health checks, the result is a [0, 1] score. This function -// currently serves as a placeholder; once wired to a concrete HTTP client -// it will surface connectivity and status-code level failures. -func (hm *Collector) checkStatusAPI(ctx context.Context) float64 { - host := hm.getPublicIP(ctx) - if host == "" { - host = "127.0.0.1" - } - - port := StatusPort - if hm.gatewayPort != 0 { - port = int(hm.gatewayPort) - } - url := fmt.Sprintf("http://%s:%d/api/v1/status", host, port) - - reqCtx, cancel := context.WithTimeout(ctx, time.Duration(PortCheckTimeoutSeconds)*time.Second) - defer cancel() - - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, nil) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("Status API health check: failed to build request: %v", err), nil) - return 0.0 - } - - client := &http.Client{ - Timeout: time.Duration(PortCheckTimeoutSeconds) * time.Second, - } - resp, err := client.Do(req) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("Status API health check: request failed to %s: %v", url, err), nil) - return 0.0 - } - defer resp.Body.Close() - - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - return 1.0 - } - - logtrace.Error(ctx, fmt.Sprintf("Status API health check: non-success status %d from %s", resp.StatusCode, url), nil) - return 0.0 -} - -// checkGRPCService performs a gRPC self-health check against the public -// supernode gRPC endpoint using the same ALTS/TLS configuration that external -// clients use. -// -// The metric is expressed as a [0, 1] score to make it easy to aggregate or -// combine with additional health signals in the future. -func (hm *Collector) checkGRPCService(ctx context.Context) float64 { - identity := strings.TrimSpace(hm.identity) - if identity == "" || hm.keyring == nil || hm.lumeraClient == nil { - logtrace.Warn(ctx, "gRPC health check skipped: missing identity/keyring/client", nil) - return 0.0 - } - - host := hm.getPublicIP(ctx) - if host == "" { - host = "127.0.0.1" - } - port := APIPort - if hm.grpcPort != 0 { - port = int(hm.grpcPort) - } - grpcEndpoint := fmt.Sprintf("%s:%d", host, port) - - // Build client credentials mirroring external secure supernode clients. - clientCreds, err := ltc.NewClientCreds(<c.ClientOptions{ - CommonOptions: ltc.CommonOptions{ - Keyring: hm.keyring, - LocalIdentity: identity, - PeerType: securekeyx.Simplenode, - Validator: lumeravalid.NewSecureKeyExchangeValidator(hm.lumeraClient), - }, - }) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("gRPC health check: failed to create client credentials: %v", err), nil) - return 0.0 - } - - // Format address as "identity@host:port" so the ALTS layer knows which - // supernode identity we expect on the remote end (ourselves). - target := ltc.FormatAddressWithIdentity(identity, grpcEndpoint) - - checkCtx, cancel := context.WithTimeout(ctx, time.Duration(PortCheckTimeoutSeconds)*time.Second) - defer cancel() - - grpcClient := grpcclient.NewClient(clientCreds) - conn, err := grpcClient.Connect(checkCtx, target, grpcclient.DefaultClientOptions()) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("gRPC health check: connection failed to %s: %v", grpcEndpoint, err), nil) - return 0.0 - } - defer conn.Close() - - healthClient := grpc_health_v1.NewHealthClient(conn) - resp, err := healthClient.Check(checkCtx, &grpc_health_v1.HealthCheckRequest{}) - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("gRPC health check: health RPC failed for %s: %v", grpcEndpoint, err), nil) - return 0.0 - } - if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - logtrace.Error(ctx, fmt.Sprintf("gRPC health check: service not serving (status=%v) at %s", resp.Status, grpcEndpoint), nil) - return 0.0 - } - - return 1.0 -} - -// getPublicIP determines the node's public IP address from chain registration. -func (hm *Collector) getPublicIP(ctx context.Context) string { - // Get our registered IP from the blockchain - this is the source of truth. - // The SuperNode must be registered on chain to operate, so this should always work. - snInfo, err := hm.lumeraClient.SuperNode().GetSupernodeWithLatestAddress(ctx, hm.identity) - if err == nil && snInfo != nil && snInfo.LatestAddress != "" { - // Extract IP from "ip:port" format if present. - address := strings.TrimSpace(snInfo.LatestAddress) - if idx := strings.Index(address, ":"); idx > 0 { - return address[:idx] - } - return address - } - - // If we can't get IP from chain, log error and return empty. - if err != nil { - logtrace.Error(ctx, fmt.Sprintf("failed to get IP from chain registration: %v (identity=%s)", err, hm.identity), nil) - } - - return "" -} diff --git a/supernode/supernode_metrics/metrics_collection.go b/supernode/supernode_metrics/metrics_collection.go index 35bcfb90..d8a63365 100644 --- a/supernode/supernode_metrics/metrics_collection.go +++ b/supernode/supernode_metrics/metrics_collection.go @@ -6,8 +6,10 @@ import ( "os" "strconv" "strings" + "time" sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types" + "github.com/LumeraProtocol/supernode/v2/pkg/reachability" ) // collectMetrics gathers a snapshot of local health and resource usage and @@ -104,42 +106,56 @@ func parseVersion(version string) [3]int { return result } -// openPorts returns the set of TCP ports this node advertises as open in its -// metrics report. For each well-known port we first perform the corresponding -// self-connect health check; only ports that successfully complete their -// external-style probe are included. -func (hm *Collector) openPorts(ctx context.Context) []uint32 { - seen := make(map[uint32]struct{}, 3) - out := make([]uint32, 0, 3) - - // gRPC port (supernode service) – include only if the ALTS + gRPC health - // check succeeds. - if hm.checkGRPCService(ctx) >= 1.0 { - val := uint32(hm.grpcPort) - if _, ok := seen[val]; !ok && val != 0 { - seen[val] = struct{}{} - out = append(out, val) +// openPorts returns tri-state status for the node's well-known service ports. +// +// Ports are only marked OPEN when we have recent evidence of real inbound +// traffic. A port is marked CLOSED only when we have no fresh inbound evidence +// and the deterministic active-probing quorum rules indicate that silence is +// meaningful (i.e. enough assigned probers are alive this epoch). +func (hm *Collector) openPorts(ctx context.Context) []sntypes.PortStatus { + out := make([]sntypes.PortStatus, 0, 3) + + now := time.Now() + window := time.Duration(EvidenceWindowSeconds) * time.Second + if hm.reportInterval > 0 { + // Ensure evidence doesn't expire between expected metrics reports. + minWindow := hm.reportInterval * 2 + if minWindow > window { + window = minWindow } } - // P2P port – include only if a full ALTS handshake on the P2P socket - // succeeds. - if hm.checkP2PService(ctx) >= 1.0 { - val := uint32(hm.p2pPort) - if _, ok := seen[val]; !ok && val != 0 { - seen[val] = struct{}{} - out = append(out, val) + store := reachability.DefaultStore() + canInferClosed := hm.silenceImpliesClosed(ctx) + + if hm.grpcPort != 0 { + state := sntypes.PortState_PORT_STATE_UNKNOWN + if store != nil && store.IsInboundFresh(reachability.ServiceGRPC, window, now) { + state = sntypes.PortState_PORT_STATE_OPEN + } else if store != nil && canInferClosed { + state = sntypes.PortState_PORT_STATE_CLOSED + } + out = append(out, sntypes.PortStatus{Port: uint32(hm.grpcPort), State: state}) + } + + if hm.p2pPort != 0 { + state := sntypes.PortState_PORT_STATE_UNKNOWN + if store != nil && store.IsInboundFresh(reachability.ServiceP2P, window, now) { + state = sntypes.PortState_PORT_STATE_OPEN + } else if store != nil && canInferClosed { + state = sntypes.PortState_PORT_STATE_CLOSED } + out = append(out, sntypes.PortStatus{Port: uint32(hm.p2pPort), State: state}) } - // HTTP gateway / status port – include only if /api/v1/status responds - // with a successful status code. - if hm.checkStatusAPI(ctx) >= 1.0 { - val := uint32(hm.gatewayPort) - if _, ok := seen[val]; !ok && val != 0 { - seen[val] = struct{}{} - out = append(out, val) + if hm.gatewayPort != 0 { + state := sntypes.PortState_PORT_STATE_UNKNOWN + if store != nil && store.IsInboundFresh(reachability.ServiceGateway, window, now) { + state = sntypes.PortState_PORT_STATE_OPEN + } else if store != nil && canInferClosed { + state = sntypes.PortState_PORT_STATE_CLOSED } + out = append(out, sntypes.PortStatus{Port: uint32(hm.gatewayPort), State: state}) } return out diff --git a/supernode/supernode_metrics/monitor_service.go b/supernode/supernode_metrics/monitor_service.go index da868f81..308b8800 100644 --- a/supernode/supernode_metrics/monitor_service.go +++ b/supernode/supernode_metrics/monitor_service.go @@ -17,22 +17,6 @@ import ( sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types" ) -const ( - // DefaultStartupDelaySeconds is a safety delay after process start before - // we begin reporting metrics, giving the node time to fully initialize. - DefaultStartupDelaySeconds = 30 - // PortCheckTimeoutSeconds bounds how long we wait when probing port - // accessibility, so a single slow check cannot stall the entire loop. - PortCheckTimeoutSeconds = 5 - - // Well-known local ports used when reporting `open_ports` metrics. - // These are defaults; individual nodes may override them via config. - // They should stay aligned with the chain's `required_open_ports` parameter. - APIPort = 4444 // Supernode gRPC port - P2PPort = 4445 // Kademlia / P2P port - StatusPort = 8002 // HTTP gateway port (grpc-gateway: /api/v1/status) -) - // Collector manages the end-to-end supernode metrics flow: // 1) derive configuration from on-chain params, // 2) collect local health data from the status service and helpers, and @@ -57,18 +41,22 @@ type Collector struct { // stopChan is closed to signal the reporting loop to exit. stopChan chan struct{} // wg tracks the lifetime of background goroutines to enable clean shutdowns. - wg sync.WaitGroup + wg sync.WaitGroup + probePlanMu sync.RWMutex + probePlan *probePlan // Configuration (derived from on-chain params) // reportInterval is the wall-clock interval between metrics reports, // derived from the `metrics_update_interval_blocks` param and the observed block time. - reportInterval time.Duration + reportInterval time.Duration + metricsUpdateIntervalBlocks uint64 + metricsFreshnessMaxBlocks uint64 // version is the semantic version of this supernode binary, used to populate // the `version_*` fields in SupernodeMetrics. version string // Listener ports for this specific supernode instance. - // These are used for self-connect checks and for populating `open_ports`. + // These are used for populating `open_ports`. grpcPort uint16 p2pPort uint16 gatewayPort uint16 @@ -125,21 +113,55 @@ func (hm *Collector) Start(ctx context.Context) error { return fmt.Errorf("failed to fetch supernode params for health monitor: %w", err) } - params := paramsResp.GetParams() + params := paramsResp.GetParams().WithDefaults() intervalBlocks := params.GetMetricsUpdateIntervalBlocks() if intervalBlocks == 0 { return fmt.Errorf("supernode params metrics_update_interval_blocks is zero or unset") } + hm.metricsUpdateIntervalBlocks = intervalBlocks + hm.metricsFreshnessMaxBlocks = params.GetMetricsFreshnessMaxBlocks() hm.reportInterval = hm.resolveReportInterval(ctx, intervalBlocks) } hm.wg.Add(1) go hm.reportingLoop(ctx) + // Active probing generates a small amount of deterministic peer-to-peer traffic so + // that quiet-but-healthy nodes still receive inbound connections, enabling + // evidence-based `open_ports` to converge to OPEN instead of UNKNOWN. + hm.wg.Add(1) + go hm.probingLoop(ctx) + return nil } +func (hm *Collector) probingEpochBlocks() uint64 { + if hm == nil { + return 0 + } + return hm.metricsUpdateIntervalBlocks +} + +func (hm *Collector) setProbePlan(plan *probePlan) { + if hm == nil { + return + } + hm.probePlanMu.Lock() + hm.probePlan = plan + hm.probePlanMu.Unlock() +} + +func (hm *Collector) getProbePlan() *probePlan { + if hm == nil { + return nil + } + hm.probePlanMu.RLock() + plan := hm.probePlan + hm.probePlanMu.RUnlock() + return plan +} + // resolveReportInterval converts a block-based interval into a wall-clock // duration using the current estimated block time. func (hm *Collector) resolveReportInterval(ctx context.Context, intervalBlocks uint64) time.Duration { diff --git a/supernode/supernode_metrics/reachability_active_probing_test.go b/supernode/supernode_metrics/reachability_active_probing_test.go new file mode 100644 index 00000000..6b14a15d --- /dev/null +++ b/supernode/supernode_metrics/reachability_active_probing_test.go @@ -0,0 +1,473 @@ +package supernode_metrics + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + tmproto "github.com/cometbft/cometbft/proto/tendermint/types" + cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" + + sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action_msg" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/auth" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/bank" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/node" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode_msg" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" + "github.com/LumeraProtocol/supernode/v2/pkg/reachability" +) + +func TestDeriveProbeOffsets(t *testing.T) { + offsets := deriveProbeOffsets(123, 3, 10) + if len(offsets) != 3 { + t.Fatalf("expected 3 offsets, got %d", len(offsets)) + } + seen := map[int]struct{}{} + for _, off := range offsets { + if off < 1 || off > 9 { + t.Fatalf("offset out of range: %d", off) + } + if _, ok := seen[off]; ok { + t.Fatalf("duplicate offset: %d", off) + } + seen[off] = struct{}{} + } + + offsets2 := deriveProbeOffsets(123, 3, 10) + for i := range offsets { + if offsets[i] != offsets2[i] { + t.Fatalf("offsets not deterministic: %v vs %v", offsets, offsets2) + } + } +} + +func TestDeriveProbeOffsetsTruncatesWhenKExceedsPeers(t *testing.T) { + offsets := deriveProbeOffsets(1, 100, 4) + if len(offsets) != 3 { + t.Fatalf("expected 3 offsets (N-1), got %d", len(offsets)) + } + seen := map[int]struct{}{} + for _, off := range offsets { + if off < 1 || off > 3 { + t.Fatalf("offset out of range: %d", off) + } + if _, ok := seen[off]; ok { + t.Fatalf("duplicate offset: %d", off) + } + seen[off] = struct{}{} + } +} + +func TestDeterministicScheduleDistinctProbers(t *testing.T) { + const ( + nPeers = 7 + epoch = uint64(42) + k = 3 + ) + offsets := deriveProbeOffsets(epoch, k, nPeers) + if len(offsets) != k { + t.Fatalf("expected %d offsets, got %d", k, len(offsets)) + } + + for target := 0; target < nPeers; target++ { + seen := map[int]struct{}{} + for _, off := range offsets { + prober := (target - off) % nPeers + if prober < 0 { + prober += nPeers + } + if _, ok := seen[prober]; ok { + t.Fatalf("target=%d: duplicate prober=%d for offsets=%v", target, prober, offsets) + } + seen[prober] = struct{}{} + } + if len(seen) != k { + t.Fatalf("target=%d: expected %d distinct probers, got %d", target, k, len(seen)) + } + } +} + +func TestBuildEligiblePeersFilters(t *testing.T) { + height := int64(100) + active := []*sntypes.SuperNode{ + { + SupernodeAccount: "a", + States: []*sntypes.SuperNodeStateRecord{ + {Height: height, State: sntypes.SuperNodeStateActive}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: height, Address: "203.0.113.1:4444"}, + }, + Metrics: &sntypes.MetricsAggregate{Height: height}, + P2PPort: "5555", + }, + { + SupernodeAccount: "b", + States: []*sntypes.SuperNodeStateRecord{ + {Height: height, State: sntypes.SuperNodeStateActive}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: height, Address: "203.0.113.2"}, // no port => default APIPort + }, + Metrics: &sntypes.MetricsAggregate{Height: 99}, + }, + { + SupernodeAccount: "postponed", + States: []*sntypes.SuperNodeStateRecord{ + {Height: height, State: sntypes.SuperNodeStatePostponed}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: height, Address: "203.0.113.3:4444"}, + }, + }, + { + SupernodeAccount: "ipv6", + States: []*sntypes.SuperNodeStateRecord{ + {Height: height, State: sntypes.SuperNodeStateActive}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: height, Address: "[2001:db8::1]:4444"}, + }, + }, + { + SupernodeAccount: "private", + States: []*sntypes.SuperNodeStateRecord{ + {Height: height, State: sntypes.SuperNodeStateActive}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: height, Address: "192.168.1.10:4444"}, + }, + }, + { + SupernodeAccount: "", + States: []*sntypes.SuperNodeStateRecord{ + {Height: height, State: sntypes.SuperNodeStateActive}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: height, Address: "203.0.113.9:4444"}, + }, + }, + } + + got := buildEligiblePeers(active, false) + if len(got) != 2 { + t.Fatalf("expected 2 eligible peers, got %d", len(got)) + } + if got[0].identity != "a" || got[0].grpcPort != 4444 || got[0].p2pPort != 5555 || got[0].metricsHeight != height { + t.Fatalf("unexpected peer a: %+v", got[0]) + } + if got[1].identity != "b" || got[1].grpcPort != APIPort || got[1].metricsHeight != 99 { + t.Fatalf("unexpected peer b: %+v", got[1]) + } + + gotTest := buildEligiblePeers(active, true) + if len(gotTest) != 3 { + t.Fatalf("expected 3 eligible peers in integration test mode, got %d", len(gotTest)) + } + foundPrivate := false + for _, p := range gotTest { + if p.identity == "private" { + foundPrivate = true + break + } + } + if !foundPrivate { + t.Fatalf("expected private peer to be eligible in integration test mode") + } +} + +func TestOpenPortsClosedWhenQuorumSatisfiedAndNoEvidence(t *testing.T) { + const ( + currentHeight = int64(250) + epochBlocks = uint64(100) + ) + + peers := []*sntypes.SuperNode{ + mkSN("a", "203.0.113.1", currentHeight, currentHeight), + mkSN("b", "203.0.113.2", currentHeight, currentHeight), + mkSN("c", "203.0.113.3", currentHeight, currentHeight), // self + mkSN("d", "203.0.113.4", currentHeight, currentHeight), + mkSN("e", "203.0.113.5", currentHeight, currentHeight), + } + + client := &fakeLumeraClient{ + nodeModule: &fakeNodeModule{height: currentHeight}, + snModule: &fakeSupernodeModule{supernodes: peers}, + } + + store := reachability.NewStore("c") + reachability.SetDefaultStore(store) + t.Cleanup(func() { reachability.SetDefaultStore(nil) }) + + hm := &Collector{ + lumeraClient: client, + identity: "c", + metricsUpdateIntervalBlocks: epochBlocks, + metricsFreshnessMaxBlocks: 5000, + grpcPort: APIPort, + p2pPort: P2PPort, + gatewayPort: StatusPort, + } + + ports := hm.openPorts(context.Background()) + if len(ports) != 3 { + t.Fatalf("expected 3 port statuses, got %d", len(ports)) + } + for _, ps := range ports { + if ps.State != sntypes.PortState_PORT_STATE_CLOSED { + t.Fatalf("expected CLOSED, got %v for port=%d", ps.State, ps.Port) + } + } +} + +func TestOpenPortsUnknownWithoutQuorum(t *testing.T) { + // Only two eligible peers => cannot meet ProbeQuorum=2 (needs at least 3 peers). + peers := []*sntypes.SuperNode{ + mkSN("a", "203.0.113.1", 250, 250), + mkSN("b", "203.0.113.2", 250, 250), // self + } + + client := &fakeLumeraClient{ + nodeModule: &fakeNodeModule{height: 250}, + snModule: &fakeSupernodeModule{supernodes: peers}, + } + + store := reachability.NewStore("b") + reachability.SetDefaultStore(store) + t.Cleanup(func() { reachability.SetDefaultStore(nil) }) + + hm := &Collector{ + lumeraClient: client, + identity: "b", + metricsUpdateIntervalBlocks: 100, + metricsFreshnessMaxBlocks: 5000, + grpcPort: APIPort, + p2pPort: P2PPort, + gatewayPort: StatusPort, + } + + ports := hm.openPorts(context.Background()) + if len(ports) != 3 { + t.Fatalf("expected 3 port statuses, got %d", len(ports)) + } + for _, ps := range ports { + if ps.State != sntypes.PortState_PORT_STATE_UNKNOWN { + t.Fatalf("expected UNKNOWN, got %v for port=%d", ps.State, ps.Port) + } + } +} + +func TestOpenPortsUnknownWhenStoreNil(t *testing.T) { + const height = int64(250) + peers := []*sntypes.SuperNode{ + mkSN("a", "203.0.113.1", height, height), + mkSN("b", "203.0.113.2", height, height), + mkSN("c", "203.0.113.3", height, height), // self + mkSN("d", "203.0.113.4", height, height), + mkSN("e", "203.0.113.5", height, height), + } + + client := &fakeLumeraClient{ + nodeModule: &fakeNodeModule{height: height}, + snModule: &fakeSupernodeModule{supernodes: peers}, + } + + reachability.SetDefaultStore(nil) + + hm := &Collector{ + lumeraClient: client, + identity: "c", + metricsUpdateIntervalBlocks: 100, + metricsFreshnessMaxBlocks: 5000, + grpcPort: APIPort, + p2pPort: P2PPort, + gatewayPort: StatusPort, + } + + ports := hm.openPorts(context.Background()) + if len(ports) != 3 { + t.Fatalf("expected 3 port statuses, got %d", len(ports)) + } + for _, ps := range ports { + if ps.State != sntypes.PortState_PORT_STATE_UNKNOWN { + t.Fatalf("expected UNKNOWN, got %v for port=%d", ps.State, ps.Port) + } + } +} + +func TestOpenPortsOpenOverridesClosed(t *testing.T) { + const height = int64(250) + peers := []*sntypes.SuperNode{ + mkSN("a", "203.0.113.1", height, height), + mkSN("b", "203.0.113.2", height, height), + mkSN("c", "203.0.113.3", height, height), // self + mkSN("d", "203.0.113.4", height, height), + mkSN("e", "203.0.113.5", height, height), + } + + client := &fakeLumeraClient{ + nodeModule: &fakeNodeModule{height: height}, + snModule: &fakeSupernodeModule{supernodes: peers}, + } + + store := reachability.NewStore("c") + reachability.SetDefaultStore(store) + t.Cleanup(func() { reachability.SetDefaultStore(nil) }) + + store.RecordInbound(reachability.ServiceGRPC, "a", &net.TCPAddr{IP: net.ParseIP("203.0.113.50"), Port: 1234}, time.Now()) + + hm := &Collector{ + lumeraClient: client, + identity: "c", + metricsUpdateIntervalBlocks: 100, + metricsFreshnessMaxBlocks: 5000, + grpcPort: APIPort, + p2pPort: P2PPort, + gatewayPort: StatusPort, + } + + ports := hm.openPorts(context.Background()) + for _, ps := range ports { + switch ps.Port { + case APIPort: + if ps.State != sntypes.PortState_PORT_STATE_OPEN { + t.Fatalf("expected gRPC OPEN, got %v", ps.State) + } + case P2PPort, StatusPort: + if ps.State != sntypes.PortState_PORT_STATE_CLOSED { + t.Fatalf("expected port=%d CLOSED, got %v", ps.Port, ps.State) + } + default: + t.Fatalf("unexpected port: %d", ps.Port) + } + } +} + +func TestSilenceImpliesClosedUsesProbePlanSnapshot(t *testing.T) { + const ( + height = int64(250) + epochBlocks = uint64(100) + ) + + hm := &Collector{ + lumeraClient: &fakeLumeraClient{nodeModule: &fakeNodeModule{height: height}}, + identity: "c", + metricsUpdateIntervalBlocks: epochBlocks, + metricsFreshnessMaxBlocks: 5000, + } + + hm.setProbePlan(&probePlan{ + epochID: 2, // floor(250/100) + epochBlocks: epochBlocks, + eligiblePeers: []probeTarget{ + {identity: "a", metricsHeight: height}, + {identity: "b", metricsHeight: height}, + {identity: "c", metricsHeight: height}, + {identity: "d", metricsHeight: height}, + {identity: "e", metricsHeight: height}, + }, + }) + + if !hm.silenceImpliesClosed(context.Background()) { + t.Fatalf("expected quorum to be satisfied using the cached probe plan") + } +} + +func TestProbeRoundIntervalEnforcesMinAttemptsPerReportInterval(t *testing.T) { + if got := probeRoundInterval(30*time.Second, 1); got != 10*time.Second { + t.Fatalf("expected 30s/3=10s for 1 target, got %v", got) + } + if got := probeRoundInterval(30*time.Second, 2); got != 10*time.Second { + t.Fatalf("expected 30s/3=10s for 2 targets, got %v", got) + } + if got := probeRoundInterval(30*time.Second, 3); got != 10*time.Second { + t.Fatalf("expected 30s/3=10s for 3 targets, got %v", got) + } + if got := probeRoundInterval(30*time.Second, 6); got != 5*time.Second { + t.Fatalf("expected 30s/6=5s for 6 targets, got %v", got) + } +} + +func mkSN(account string, ip string, recordHeight int64, metricsHeight int64) *sntypes.SuperNode { + return &sntypes.SuperNode{ + SupernodeAccount: account, + States: []*sntypes.SuperNodeStateRecord{ + {Height: recordHeight, State: sntypes.SuperNodeStateActive}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: recordHeight, Address: fmt.Sprintf("%s:%d", ip, APIPort)}, + }, + Metrics: &sntypes.MetricsAggregate{Height: metricsHeight}, + P2PPort: fmt.Sprintf("%d", P2PPort), + } +} + +type fakeLumeraClient struct { + nodeModule node.Module + snModule supernode.Module +} + +func (c *fakeLumeraClient) Auth() auth.Module { return nil } +func (c *fakeLumeraClient) Action() action.Module { return nil } +func (c *fakeLumeraClient) ActionMsg() action_msg.Module { return nil } +func (c *fakeLumeraClient) SuperNode() supernode.Module { return c.snModule } +func (c *fakeLumeraClient) SuperNodeMsg() supernode_msg.Module { return nil } +func (c *fakeLumeraClient) Bank() bank.Module { return nil } +func (c *fakeLumeraClient) Tx() tx.Module { return nil } +func (c *fakeLumeraClient) Node() node.Module { return c.nodeModule } +func (c *fakeLumeraClient) Close() error { return nil } + +type fakeNodeModule struct { + height int64 +} + +func (m *fakeNodeModule) GetLatestBlock(ctx context.Context) (*cmtservice.GetLatestBlockResponse, error) { + _ = ctx + return &cmtservice.GetLatestBlockResponse{ + Block: &tmproto.Block{Header: tmproto.Header{Height: m.height}}, + }, nil +} +func (m *fakeNodeModule) GetBlockByHeight(context.Context, int64) (*cmtservice.GetBlockByHeightResponse, error) { + return nil, nil +} +func (m *fakeNodeModule) GetNodeInfo(context.Context) (*cmtservice.GetNodeInfoResponse, error) { + return nil, nil +} +func (m *fakeNodeModule) GetSyncing(context.Context) (*cmtservice.GetSyncingResponse, error) { + return nil, nil +} +func (m *fakeNodeModule) GetLatestValidatorSet(context.Context) (*cmtservice.GetLatestValidatorSetResponse, error) { + return nil, nil +} +func (m *fakeNodeModule) GetValidatorSetByHeight(context.Context, int64) (*cmtservice.GetValidatorSetByHeightResponse, error) { + return nil, nil +} +func (m *fakeNodeModule) Sign(string, []byte) ([]byte, error) { return nil, nil } + +type fakeSupernodeModule struct { + supernodes []*sntypes.SuperNode +} + +func (m *fakeSupernodeModule) GetTopSuperNodesForBlock(context.Context, *sntypes.QueryGetTopSuperNodesForBlockRequest) (*sntypes.QueryGetTopSuperNodesForBlockResponse, error) { + return nil, nil +} +func (m *fakeSupernodeModule) GetSuperNode(context.Context, string) (*sntypes.QueryGetSuperNodeResponse, error) { + return nil, nil +} +func (m *fakeSupernodeModule) GetSupernodeBySupernodeAddress(context.Context, string) (*sntypes.SuperNode, error) { + return nil, nil +} +func (m *fakeSupernodeModule) GetSupernodeWithLatestAddress(context.Context, string) (*supernode.SuperNodeInfo, error) { + return nil, nil +} +func (m *fakeSupernodeModule) GetParams(context.Context) (*sntypes.QueryParamsResponse, error) { + return nil, nil +} +func (m *fakeSupernodeModule) ListSuperNodes(context.Context) (*sntypes.QueryListSuperNodesResponse, error) { + return &sntypes.QueryListSuperNodesResponse{Supernodes: m.supernodes}, nil +} diff --git a/supernode/supernode_metrics/reachability_quorum.go b/supernode/supernode_metrics/reachability_quorum.go new file mode 100644 index 00000000..eaaea28a --- /dev/null +++ b/supernode/supernode_metrics/reachability_quorum.go @@ -0,0 +1,105 @@ +package supernode_metrics + +import ( + "context" + "os" + "sort" + "strings" +) + +func (hm *Collector) silenceImpliesClosed(ctx context.Context) bool { + if hm == nil || hm.lumeraClient == nil { + return false + } + if ProbeAssignmentsPerEpoch <= 0 || ProbeQuorum <= 0 || ProbeAssignmentsPerEpoch < ProbeQuorum { + return false + } + + height, ok := hm.latestBlockHeight(ctx) + if !ok { + return false + } + + epochBlocks := hm.probingEpochBlocks() + if epochBlocks == 0 { + return false + } + epochID := uint64(height) / epochBlocks + epochStartHeight := int64(epochID * epochBlocks) + + peers := []probeTarget(nil) + if plan := hm.getProbePlan(); plan != nil && plan.epochID == epochID && plan.epochBlocks == epochBlocks && len(plan.eligiblePeers) > 0 { + // Use the probing loop's epoch snapshot so quorum calculations match + // the deterministic schedule used by the traffic generator. + peers = plan.eligiblePeers + } else { + snModule := hm.lumeraClient.SuperNode() + if snModule == nil { + return false + } + + resp, err := snModule.ListSuperNodes(ctx) + if err != nil || resp == nil { + return false + } + + isTest := strings.EqualFold(strings.TrimSpace(os.Getenv("INTEGRATION_TEST")), "true") + peers = buildEligiblePeers(resp.Supernodes, isTest) + sort.Slice(peers, func(i, j int) bool { + return peers[i].identity < peers[j].identity + }) + } + if len(peers) < ProbeQuorum+1 { + return false + } + + selfIdentity := strings.TrimSpace(hm.identity) + if selfIdentity == "" { + return false + } + + selfIndex := -1 + for i := range peers { + if peers[i].identity == selfIdentity { + selfIndex = i + break + } + } + if selfIndex < 0 { + return false + } + + offsets := deriveProbeOffsets(epochID, ProbeAssignmentsPerEpoch, len(peers)) + if len(offsets) < ProbeQuorum { + return false + } + + alive := 0 + for _, off := range offsets { + idx := (selfIndex - off) % len(peers) + if idx < 0 { + idx += len(peers) + } + if isAliveProber(peers[idx], epochStartHeight, height, hm.metricsFreshnessMaxBlocks) { + alive++ + } + } + + return alive >= ProbeQuorum +} + +func isAliveProber(peer probeTarget, epochStartHeight int64, currentHeight int64, freshnessMaxBlocks uint64) bool { + if peer.metricsHeight <= 0 { + return false + } + if peer.metricsHeight < epochStartHeight { + return false + } + if freshnessMaxBlocks == 0 { + return true + } + if currentHeight < peer.metricsHeight { + return true + } + return currentHeight-peer.metricsHeight <= int64(freshnessMaxBlocks) +} diff --git a/supernode/transport/gateway/server.go b/supernode/transport/gateway/server.go index 5c4df034..e820e7cc 100644 --- a/supernode/transport/gateway/server.go +++ b/supernode/transport/gateway/server.go @@ -16,6 +16,7 @@ import ( pb "github.com/LumeraProtocol/supernode/v2/gen/supernode" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + "github.com/LumeraProtocol/supernode/v2/pkg/reachability" ) // DefaultGatewayPort is an uncommon port for internal gateway use @@ -150,7 +151,7 @@ func (s *Server) Run(ctx context.Context) error { // Create HTTP server s.server = &http.Server{ Addr: net.JoinHostPort(s.ipAddress, strconv.Itoa(s.port)), - Handler: s.corsMiddleware(httpMux), + Handler: s.corsMiddleware(s.reachabilityMiddleware(httpMux)), ReadTimeout: 15 * time.Second, WriteTimeout: 15 * time.Second, IdleTimeout: 60 * time.Second, @@ -196,6 +197,50 @@ func (s *Server) corsMiddleware(h http.Handler) http.Handler { }) } +type statusCapturingWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *statusCapturingWriter) WriteHeader(statusCode int) { + w.statusCode = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +// reachabilityMiddleware records inbound evidence for the gateway port based on +// real external requests to `/api/v1/status`. +func (s *Server) reachabilityMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sw := &statusCapturingWriter{ResponseWriter: w, statusCode: 0} + h.ServeHTTP(sw, r) + + if r.URL != nil && r.URL.Path == "/api/v1/status" { + // Default status code is 200 if WriteHeader was not called explicitly. + code := sw.statusCode + if code == 0 { + code = http.StatusOK + } + if code >= 200 && code < 300 { + store := reachability.DefaultStore() + if store == nil { + return + } + + var addr net.Addr + if host, portStr, err := net.SplitHostPort(r.RemoteAddr); err == nil { + if ip := net.ParseIP(host); ip != nil { + port, _ := strconv.Atoi(portStr) + addr = &net.TCPAddr{IP: ip, Port: port} + } + } else if ip := net.ParseIP(r.RemoteAddr); ip != nil { + addr = &net.TCPAddr{IP: ip} + } + store.RecordInbound(reachability.ServiceGateway, "", addr, time.Now()) + } + } + }) +} + // pprofHandler proxies requests to the pprof handlers func (s *Server) pprofHandler(w http.ResponseWriter, r *http.Request) { // Check if pprof is enabled