Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b377c70
Codeq settings
mateeullahmalik Sep 23, 2025
6a225d3
Fix concurrent write panic in metrics
mateeullahmalik Sep 23, 2025
306d295
Enable Pprof
j-rafique Sep 24, 2025
bca91a1
Remove config
j-rafique Sep 24, 2025
e8e5e7c
Merge pull request #187 from LumeraProtocol/EnablePprof
j-rafique Sep 24, 2025
efbe176
Codec callback (#188)
mateeullahmalik Sep 24, 2025
74fb97e
Change test port
j-rafique Sep 24, 2025
cf7f877
Async events (#190)
mateeullahmalik Sep 24, 2025
1f524bf
Add profile script
j-rafique Sep 24, 2025
1310df6
Merge pull request #191 from LumeraProtocol/ProfileScript
j-rafique Sep 24, 2025
b123dce
Change test port
mateeullahmalik Sep 24, 2025
3c66950
Disable Metrics
j-rafique Sep 25, 2025
4dd2bca
Merge pull request #192 from LumeraProtocol/DisableMetric
j-rafique Sep 25, 2025
c53d7c3
Silence all logs
mateeullahmalik Sep 29, 2025
6bd2b9c
high-signal logs
mateeullahmalik Sep 30, 2025
8346283
Add datadog logs
mateeullahmalik Sep 30, 2025
0e35cf6
TxResponse
mateeullahmalik Sep 30, 2025
3d8386a
skip check for public field
mateeullahmalik Sep 30, 2025
d32a5c7
peers+balance (#196)
mateeullahmalik Sep 30, 2025
7b85f4d
Tx mode block
mateeullahmalik Oct 1, 2025
34dfc95
fix :tx Broadcst method cleanup
mateeullahmalik Oct 2, 2025
7bd1a03
Logs
mateeullahmalik Oct 2, 2025
ee927a0
High Signal Logs
mateeullahmalik Oct 2, 2025
18c2a70
enhance tracing
mateeullahmalik Oct 2, 2025
cae661d
Metrics Cleanup
mateeullahmalik Oct 3, 2025
57c92f5
Increase raptorq memory
mateeullahmalik Oct 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/build&release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ jobs:
echo "binary_name=supernode-linux-amd64" >> $GITHUB_OUTPUT

- name: Build Release Version
env:
DD_API_KEY: ${{ secrets.DD_API_KEY }}
DD_SITE: ${{ secrets.DD_SITE }}
run: |
mkdir -p release

Expand All @@ -94,7 +97,9 @@ jobs:
-ldflags="-s -w \
-X github.com/LumeraProtocol/supernode/v2/supernode/cmd.Version=${{ steps.vars.outputs.version }} \
-X github.com/LumeraProtocol/supernode/v2/supernode/cmd.GitCommit=${{ steps.vars.outputs.git_commit }} \
-X github.com/LumeraProtocol/supernode/v2/supernode/cmd.BuildTime=${{ steps.vars.outputs.build_time }}" \
-X github.com/LumeraProtocol/supernode/v2/supernode/cmd.BuildTime=${{ steps.vars.outputs.build_time }} \
-X github.com/LumeraProtocol/supernode/v2/pkg/logtrace.DDAPIKey=${DD_API_KEY} \
-X github.com/LumeraProtocol/supernode/v2/pkg/logtrace.DDSite=${DD_SITE}" \
-o release/supernode \
./supernode

Expand Down
12 changes: 7 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: build build-release build-sncli build-sn-manager
.PHONY: install-lumera setup-supernodes system-test-setup
.PHONY: install-lumera setup-supernodes system-test-setup install-deps
.PHONY: gen-cascade gen-supernode
.PHONY: test-e2e test-unit test-integration test-system

Expand All @@ -11,7 +11,9 @@ BUILD_TIME ?= $(shell date -u '+%Y-%m-%d_%H:%M:%S')
# Linker flags for version information
LDFLAGS = -X github.com/LumeraProtocol/supernode/v2/supernode/cmd.Version=$(VERSION) \
-X github.com/LumeraProtocol/supernode/v2/supernode/cmd.GitCommit=$(GIT_COMMIT) \
-X github.com/LumeraProtocol/supernode/v2/supernode/cmd.BuildTime=$(BUILD_TIME)
-X github.com/LumeraProtocol/supernode/v2/supernode/cmd.BuildTime=$(BUILD_TIME) \
-X github.com/LumeraProtocol/supernode/v2/pkg/logtrace.DDAPIKey=$(DD_API_KEY) \
-X github.com/LumeraProtocol/supernode/v2/pkg/logtrace.DDSite=$(DD_SITE)

# Linker flags for sn-manager
SN_MANAGER_LDFLAGS = -X main.Version=$(VERSION) \
Expand Down Expand Up @@ -96,7 +98,7 @@ gen-supernode:
--grpc-gateway_out=gen \
--grpc-gateway_opt=paths=source_relative \
--openapiv2_out=gen \
proto/supernode/supernode.proto
proto/supernode/service.proto proto/supernode/status.proto

# Define the paths
SUPERNODE_SRC=supernode/main.go
Expand Down Expand Up @@ -140,9 +142,9 @@ test-e2e:
# Run cascade e2e tests only
test-cascade:
@echo "Running cascade e2e tests..."
@cd tests/system && go test -tags=system_test -v -run TestCascadeE2E .
@cd tests/system && go mod tidy && go test -tags=system_test -v -run TestCascadeE2E .

# Run sn-manager e2e tests only
test-sn-manager:
@echo "Running sn-manager e2e tests..."
@cd tests/system && go test -tags=system_test -v -run '^TestSNManager' .
@cd tests/system && go test -tags=system_test -v -run '^TestSNManager' .
28 changes: 8 additions & 20 deletions docs/cascade-store-artifacts.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Cascade Artefacts Storage Flow

This document explains, in depth, how Cascade artefacts (ID files + RaptorQ symbols) are persisted to the P2P network, the control flow from the API to the P2P layer, what metrics are collected, and which background workers continue the process after the API call returns.
This document explains how Cascade artefacts (ID files + RaptorQ symbols) are persisted to the P2P network, the control flow from the API to the P2P layer, and which background workers continue the process after the API call returns.

## Scope & Terminology

Expand Down Expand Up @@ -50,27 +50,24 @@ Function: `supernode/services/cascade/helper.go::storeArtefacts`
- `SymbolsDir string`: filesystem directory where symbols were written.
- `TaskID string` and `ActionID string`: identifiers for logging and DB association.

Returns `StoreArtefactsMetrics` with separate metrics for metadata and symbols plus an aggregated view.
Does not return metrics; logs provide visibility.

## P2P Adaptor: StoreArtefacts

Implementation: `supernode/services/cascade/adaptors/p2p.go`

1) Store metadata (ID files) using `p2p.Client.StoreBatch(...)`:
- Returns `metaRatePct` and `metaRequests` (count of per‑node RPCs attempted during this batch store).
1) Store metadata (ID files) using `p2p.Client.StoreBatch(...)`.

2) Store symbols using `storeCascadeSymbols(...)`:
- Records the symbol directory in a small SQLite store: `rqStore.StoreSymbolDirectory(taskID, symbolsDir)`.
- Walks `symbolsDir` to list symbol files. If there are more than 2,500 symbols, downsamples to 10% for this first pass (random sample, sorted deterministically afterward).
- Streams symbols in fixed‑size batches of 2,500 files:
- Each batch loads files, calls `p2p.Client.StoreBatch(...)` with a 5‑minute timeout, and deletes successfully uploaded files.
- Marks “first batch stored” for this action: `rqStore.UpdateIsFirstBatchStored(actionID)`.
- Returns `(symRatePct, symCount, symRequests)`.
- Logs counts and timings; no metrics are returned.

3) Aggregation and return:
- Computes item‑weighted aggregate success rate across metadata and symbols: `aggRate = (metaRate*metaCount + symRate*symCount) / (metaCount + symCount)`.
- Total requests = `metaRequests + symRequests`.
- Returns `StoreArtefactsMetrics` with all fields populated.
3) Return:
- No metrics aggregation; return indicates success/failure only.

Notes:
- This adaptor only performs a first pass of symbol storage. For large directories it may downsample; the background worker completes the remaining symbols later (see Background Worker section).
Expand All @@ -83,23 +80,15 @@ Notes:
- Network store: `DHT.IterateBatchStore(ctx, values, typ, taskID)`:
- For each value, compute its Blake3 hash; compute the top‑K closest nodes from the routing table.
- Build a node→items map and invoke `batchStoreNetwork(...)` with bounded concurrency (a goroutine per node, limited via a semaphore; all joined before returning).
- Tally per‑node RPC attempts (requests) and successes to compute `successRatePct`.
- If the measured rate is below `minimumDataStoreSuccessRate` (75%), return an error along with `(ratePct, requests)`.
- Otherwise, return `(ratePct, requests, nil)`.
- If the measured success rate is below an internal threshold, DHT returns an error.

Important distinctions:
- `requests` is the number of per‑node RPCs attempted; it is not the number of items in the batch.
- Success rate is based on successful node acknowledgements divided by `requests`.

## Metrics & Events

Returned metrics (from `StoreArtefacts`):

- Metadata: `MetaRate` (%), `MetaRequests`, `MetaCount`.
- Symbols: `SymRate` (%), `SymRequests`, `SymCount`.
- Aggregate: `AggregatedRate` (item‑weighted), `TotalRequests`.

`Register` logs and emits a single event line summarizing these metrics via `emitArtefactsStored(...)`, then proceeds to finalize the action on chain.
`Register` logs and emits an informational event (Artefacts stored), then proceeds to finalize the action on chain.

## Background Worker (Symbols Continuation)

Expand Down Expand Up @@ -161,4 +150,3 @@ These values can be tuned in:
- First pass deletes uploaded symbol files per batch (`utils.DeleteSymbols`) after a successful store batch.
- Background worker also deletes files after each batch store.
- The uploaded raw input file is removed by `Register` in a `defer` block regardless of outcome.

197 changes: 17 additions & 180 deletions docs/p2p-metrics-capture.md
Original file line number Diff line number Diff line change
@@ -1,186 +1,23 @@
# P2P Metrics Capture — What Each Field Means and Where It’s Collected
# P2P Metrics — Current Behavior

This guide explains every field we emit in Cascade events, how it is measured, and exactly where it is captured in the code.
We removed the custom per‑RPC metrics capture and the `pkg/p2pmetrics` package. Logs are the source of truth for store/retrieve visibility, and the Status API provides a rolling DHT snapshot for high‑level metrics.

The design is minimal by intent:
- Metrics are collected only for the first pass of Register (store) and for the active Download operation.
- P2P APIs return errors only; per‑RPC details are captured via a small metrics package (`pkg/p2pmetrics`).
- No aggregation; we only group raw RPC attempts by IP.
What remains
- Status API metrics: DHT rolling windows (store success, batch retrieve), network handle counters, ban list, DB/disk stats, and connection pool metrics.
- Logs: detailed send/ok/fail lines for RPCs at both client and server.

---
What was removed
- Per‑RPC metrics capture and grouping by IP for events.
- Metrics collectors and context tagging helpers.
- Recent per‑request lists from the Status API.

## Store (Register) Event
Events
- The supernode emits minimal events (e.g., artefacts stored, downloaded). These events no longer include metrics payloads. Use logs for detailed troubleshooting.

Event payload shape
Status API
- To include P2P metrics and peer info, clients set `include_p2p_metrics=true` on `StatusRequest`.
- The SDK adapter already includes this flag by default to populate peer count for eligibility checks.

```json
{
"store": {
"duration_ms": 9876,
"symbols_first_pass": 220,
"symbols_total": 1200,
"id_files_count": 14,
"success_rate_pct": 82.5,
"calls_by_ip": {
"10.0.0.5": [
{"ip": "10.0.0.5", "address": "A:4445", "keys": 100, "success": true, "duration_ms": 120},
{"ip": "10.0.0.5", "address": "A:4445", "keys": 120, "success": false, "error": "timeout", "duration_ms": 300}
]
}
}
}
```

### Fields

- `store.duration_ms`
- Meaning: End‑to‑end elapsed time of the first‑pass store phase (Register’s storage section only).
- Where captured: `supernode/services/cascade/adaptors/p2p.go`
- A `time.Now()` timestamp is taken just before the first‑pass store function and measured on return.

- `store.symbols_first_pass`
- Meaning: Number of symbols sent during the Register first pass (across the combined first batch and any immediate first‑pass symbol batches).
- Where captured: `supernode/services/cascade/adaptors/p2p.go` via `p2pmetrics.SetStoreSummary(...)` using the value returned by `storeCascadeSymbolsAndData`.

- `store.symbols_total`
- Meaning: Total symbols available in the symbol directory (before sampling). Used to contextualize the first‑pass coverage.
- Where captured: Computed in `storeCascadeSymbolsAndData` and included in `SetStoreSummary`.

- `store.id_files_count`
- Meaning: Number of redundant metadata files (ID files) sent in the first combined batch.
- Where captured: `len(req.IDFiles)` in `StoreArtefacts`, passed to `SetStoreSummary`.

- `store.calls_by_ip`
- Meaning: All raw network store RPC attempts grouped by the node IP.
- Each array entry is a single RPC attempt with:
- `ip` — Node IP (fallback to `address` if missing).
- `address` — Node string `IP:port`.
- `keys` — Number of items in that RPC attempt (metadata + first symbols for the first combined batch, symbols for subsequent batches within the first pass).
- `success` — True if there was no transport error and no error message returned by the node response. Note: this flag does not explicitly check the `ResultOk` status; in rare cases, a non‑OK response with an empty error message may appear as `success` in metrics. (Internal success‑rate enforcement still uses explicit response status.)
- `error` — Any error string captured; omitted when success.
- `duration_ms` — RPC duration in milliseconds.
- `noop` — Present and `true` when no store payload was sent to the node (empty batch for that node). Such entries are recorded as `success=true`, `keys=0`, with no `error`.
- Where captured:
- Emission point (P2P): `p2p/kademlia/dht.go::IterateBatchStore(...)`
- After each node RPC returns, we call `p2pmetrics.RecordStore(taskID, Call{...})`. For nodes with no payload, a `noop: true` entry is emitted without sending a wire RPC.
- `taskID` is read from the context via `p2pmetrics.TaskIDFromContext(ctx)`.
- Grouping: `pkg/p2pmetrics/metrics.go`
- `StartStoreCapture(taskID)` enables capture; `StopStoreCapture(taskID)` disables it.
- Calls are grouped by `ip` (fallback to `address`) without further aggregation.

- `store.success_rate_pct`
- Meaning: First‑pass store success rate computed from captured per‑RPC outcomes: successful responses divided by total recorded store RPC attempts, expressed as a percentage.
- Where captured: Computed in `pkg/p2pmetrics/metrics.go::BuildStoreEventPayloadFromCollector` from `calls_by_ip` data.

### First‑Pass Success Threshold

- Internal enforcement only: if DHT first‑pass success rate is below 75%, `IterateBatchStore` returns an error.
- We also emit `store.success_rate_pct` for analytics; the threshold only affects control flow (errors), not the emitted metric.
- Code: `p2p/kademlia/dht.go::IterateBatchStore`.

### Scope Limits

- Background worker (which continues storing remaining symbols) is NOT captured — we don’t set a metrics task ID on those paths.

---

## Download Event

Event payload shape

```json
{
"retrieve": {
"found_local": 42,
"retrieve_ms": 2000,
"decode_ms": 8000,
"calls_by_ip": {
"10.0.0.7": [
{"ip": "10.0.0.7", "address": "B:4445", "keys": 13, "success": true, "duration_ms": 90}
]
}
}
}
```

### Fields

- `retrieve.found_local`
- Meaning: Number of items retrieved from local storage before any network calls.
- Where captured: `p2p/kademlia/dht.go::BatchRetrieve(...)`
- After `fetchAndAddLocalKeys`, we call `p2pmetrics.ReportFoundLocal(taskID, int(foundLocalCount))`.
- `taskID` is read from context with `p2pmetrics.TaskIDFromContext(ctx)`.

- `retrieve.retrieve_ms`
- Meaning: Time spent in network batch‑retrieve.
- Where captured: `supernode/services/cascade/download.go`
- Timestamp before `BatchRetrieve`, measured after it returns.

- `retrieve.decode_ms`
- Meaning: Time spent decoding symbols and reconstructing the file.
- Where captured: `supernode/services/cascade/download.go`
- Timestamp before decode, measured after it returns.

- `retrieve.calls_by_ip`
- Meaning: All raw per‑RPC retrieve attempts grouped by node IP.
- Each array entry is a single RPC attempt with:
- `ip`, `address` — Identifiers as available.
- `keys` — Number of symbols returned by that node in that call.
- `success` — True if the RPC completed without error (even if `keys == 0`). Transport/status errors remain `success=false` with an `error` message.
- `error` — Error string when the RPC failed; omitted otherwise.
- `duration_ms` — RPC duration in milliseconds.
- `noop` — Present and `true` when no network request was actually sent to the node (e.g., all requested keys were already satisfied or deduped before issuing the call). Such entries are recorded as `success=true`, `keys=0`, with no `error`.
- Where captured:
- Emission point (P2P): `p2p/kademlia/dht.go::iterateBatchGetValues(...)`
- Each node attempt records a `p2pmetrics.RecordRetrieve(taskID, Call{...})`. For attempts where no network RPC is sent, a `noop: true` entry is emitted.
- `taskID` is extracted from context using `p2pmetrics.TaskIDFromContext(ctx)`.
- Grouping: `pkg/p2pmetrics/metrics.go` (same grouping/fallback as store).

### Scope Limits

- Metrics are captured only for the active Download call (context is tagged in `download.go`).

---

## Context Tagging (Task ID)

- We use an explicit, metrics‑only context key defined in `pkg/p2pmetrics` to tag P2P calls with a task ID.
- Setters: `p2pmetrics.WithTaskID(ctx, id)`.
- Getters: `p2pmetrics.TaskIDFromContext(ctx)`.
- Where it is set:
- Store (first pass): `supernode/services/cascade/adaptors/p2p.go` wraps `StoreBatch` calls.
- Download: `supernode/services/cascade/download.go` wraps `BatchRetrieve` call.

---

## Building and Emitting Events

- Store
- `supernode/services/cascade/helper.go::emitArtefactsStored(...)`
- Builds `store` payload via `p2pmetrics.BuildStoreEventPayloadFromCollector(taskID)`.
- Includes `success_rate_pct` (first‑pass store success rate computed from captured per‑RPC outcomes) in addition to the minimal fields.
- Emits the event.

- Download
- `supernode/services/cascade/download.go`
- Builds `retrieve` payload via `p2pmetrics.BuildDownloadEventPayloadFromCollector(actionID)`.
- Emits the event.

---

## Quick File Map

- Capture + grouping: `supernode/pkg/p2pmetrics/metrics.go`
- Store adaptor: `supernode/supernode/services/cascade/adaptors/p2p.go`
- Store event: `supernode/supernode/services/cascade/helper.go`
- Download flow: `supernode/supernode/services/cascade/download.go`
- DHT store calls: `supernode/p2p/kademlia/dht.go::IterateBatchStore`
- DHT retrieve calls: `supernode/p2p/kademlia/dht.go::BatchRetrieve` and `iterateBatchGetValues`

---

## Notes

- No P2P stats/snapshots are used to build events.
- No aggregation is performed; we only group raw RPC attempts by IP.
- First‑pass success rate is enforced internally (75% threshold) but not emitted as a metric.
References
- Status proto: `proto/supernode/status.proto`
- Service proto: `proto/supernode/service.proto`
Loading
Loading