From 19e4b4c0aa8a1d339641f1d497081b95fcc01517 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 14 May 2026 09:08:56 +0200 Subject: [PATCH 1/5] chanevents: add bidirectional pair walk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-pair uptime walk derives forwarding ability for both (A→B) and (B→A) directions in a single chronological pass over the merged event stream. Two independent state copies, one rooted at each peer, accumulate each direction's uptime against its own balance threshold. A liquidity check that would otherwise be O(channels-per-peer) per tick becomes O(1) via four running balance sums that shadow each side's online inbound and outbound capacity. The merge loop adjusts these sums as events mutate channel state, and the per-tick threshold check reads the mins inline. --- chanevents/analyzer.go | 368 +++++++++++++++++++ chanevents/analyzer_test.go | 706 ++++++++++++++++++++++++++++++++++++ 2 files changed, 1074 insertions(+) create mode 100644 chanevents/analyzer.go create mode 100644 chanevents/analyzer_test.go diff --git a/chanevents/analyzer.go b/chanevents/analyzer.go new file mode 100644 index 0000000..232dcbe --- /dev/null +++ b/chanevents/analyzer.go @@ -0,0 +1,368 @@ +package chanevents + +import ( + "context" + "errors" + "fmt" + "iter" + "log/slog" + "math" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btclog/v2" +) + +var ( + // errUnknownEventType fires when the event-replay switch sees an + // EventType outside {Offline, Online, Update}. Indicates schema drift + // between the store and the analyzer. + errUnknownEventType = errors.New("unknown channel event type") +) + +// channelEventSeq is a chronologically ordered stream of channel events +// paired with a propagated error value. +type channelEventSeq = iter.Seq2[*ChannelEvent, error] + +// ForwardingAbility quantifies the historical routing performance of a peer +// pair. Inconsistent flags the pathological case where forwards were observed +// without the pair ever crossing the liquidity threshold; Velocity is zero in +// that case because the rate is undefined over zero qualifying uptime. +type ForwardingAbility struct { + // Velocity is the forwarding velocity in sat/s during effective uptime. + Velocity float64 + + // UptimeFraction is the ratio of effective uptime to the full window + // duration, in [0, 1]. + UptimeFraction float64 + + // Inconsistent is set when forwards landed but effective uptime was + // zero, indicating the input data and the threshold model disagree. + Inconsistent bool +} + +// pairInputs encapsulates the routing performance thresholds for a single +// direction. +type pairInputs struct { + threshold btcutil.Amount + totalSuccessfulAmount btcutil.Amount +} + +// channelState is the per-channel snapshot the uptime walk carries forward as +// it consumes events: liveness plus the two balances that determine forwarding +// liquidity. +type channelState struct { + online bool + localBalance btcutil.Amount + remoteBalance btcutil.Amount +} + +// determineThreshold establishes the required liquidity floor based on the +// user's manual threshold or the calculated percentile of successful forwards. +func determineThreshold(forwardPercentile float64, + thresholdAmount btcutil.Amount, + successAmts []btcutil.Amount) (btcutil.Amount, error) { + + if len(successAmts) == 0 { + return thresholdAmount, nil + } + + q := forwardPercentile / 100 + p, err := Quantile(successAmts, q) + if err != nil { + return 0, err + } + + return max(btcutil.Amount(math.RoundToEven(p)), thresholdAmount), nil +} + +// calculateBothDirectionsUptime computes the effective forwarding uptime for +// both directions of a peer pair in a single chronological walk of the merged +// event stream. Only the liquidity-direction roles and the per-direction +// thresholds differ between the two accumulators. For self-pair calls +// (statesA == statesB, inputsAB == inputsBA) both returned abilities are +// equal. +func calculateBothDirectionsUptime(ctx context.Context, startTime, + endTime time.Time, inputsAB, inputsBA pairInputs, statesA, + statesB map[int64]*channelState, mergedEvents channelEventSeq) ( + *ForwardingAbility, *ForwardingAbility, error) { + + traceOn := log.Level() <= btclog.LevelTrace + + if traceOn { + log.TraceS(ctx, "Calculating bidirectional effective uptime") + for chanID, state := range statesA { + log.TraceS( + ctx, "Initial state A", + slog.Int64("chanID", chanID), + slog.Bool("online", state.online), + slog.Int64( + "localBalance", int64( + state.localBalance, + ), + ), + slog.Int64( + "remoteBalance", int64( + state.remoteBalance, + ), + ), + ) + } + for chanID, state := range statesB { + log.TraceS( + ctx, "Initial state B", + slog.Int64("chanID", chanID), + slog.Bool("online", state.online), + slog.Int64( + "localBalance", int64( + state.localBalance, + ), + ), + slog.Int64( + "remoteBalance", int64( + state.remoteBalance, + ), + ), + ) + } + log.TraceS( + ctx, "Using final forwarding liquidity thresholds", + slog.Int64( + "thresholdAB", int64(inputsAB.threshold), + ), + slog.Int64( + "thresholdBA", int64(inputsBA.threshold), + ), + ) + } + + statesA = copyChannelStates(statesA) + statesB = copyChannelStates(statesB) + + // Seed running balance sums once. The per-event walk maintains them + // incrementally so the per-tick liquidity check is O(1) instead of + // O(channels-per-peer). The forwarding liquidity (A→B) is the + // bottleneck min(sum of A's online channels' remoteBalance, sum of B's + // online channels' localBalance). The running sums shadow those two + // quantities and the min is computed inline in accumulate. (B→A) is the + // same formula with the roles flipped. + var sumARemote, sumALocal, sumBRemote, sumBLocal btcutil.Amount + for _, s := range statesA { + if s.online { + sumARemote += s.remoteBalance + sumALocal += s.localBalance + } + } + for _, s := range statesB { + if s.online { + sumBRemote += s.remoteBalance + sumBLocal += s.localBalance + } + } + + var uptimeAB, uptimeBA time.Duration + lastTimestamp := startTime + + accumulate := func(intervalDuration time.Duration) { + if intervalDuration <= 0 { + return + } + // (A→B): A is incoming, B is outgoing. Liquidity bottleneck is + // min(A's online inbound, B's online outbound). + liqAB := min(sumARemote, sumBLocal) + // (B→A): roles flipped. + liqBA := min(sumBRemote, sumALocal) + if traceOn { + log.TraceS( + ctx, "Forwarding liquidity check", + slog.Duration("interval", intervalDuration), + slog.Int64( + "liqAB", int64(liqAB), + ), + slog.Int64( + "liqBA", int64(liqBA), + ), + ) + } + if liqAB > inputsAB.threshold { + uptimeAB += intervalDuration + } + if liqBA > inputsBA.threshold { + uptimeBA += intervalDuration + } + } + + for event, err := range mergedEvents { + if err != nil { + return nil, nil, err + } + if traceOn { + log.TraceS( + ctx, "Processing event", + slog.Int64("chanID", event.ChannelID), + btclog.Fmt("type", "%v", event.EventType), + slog.Time("time", event.Timestamp), + ) + } + + accumulate(event.Timestamp.Sub(lastTimestamp)) + + if state, ok := statesA[event.ChannelID]; ok { + if state.online { + sumARemote -= state.remoteBalance + sumALocal -= state.localBalance + } + if err := applyEvent(state, event); err != nil { + return nil, nil, err + } + if state.online { + sumARemote += state.remoteBalance + sumALocal += state.localBalance + } + } + if state, ok := statesB[event.ChannelID]; ok { + if state.online { + sumBRemote -= state.remoteBalance + sumBLocal -= state.localBalance + } + if err := applyEvent(state, event); err != nil { + return nil, nil, err + } + if state.online { + sumBRemote += state.remoteBalance + sumBLocal += state.localBalance + } + } + + lastTimestamp = event.Timestamp + } + + accumulate(endTime.Sub(lastTimestamp)) + + if traceOn { + log.TraceS( + ctx, "Total effective uptime", + slog.Duration("uptimeAB", uptimeAB), + slog.Duration("uptimeBA", uptimeBA), + slog.Duration( + "totalDuration", endTime.Sub(startTime), + ), + ) + } + + abilityAB := makeAbility( + startTime, endTime, uptimeAB, inputsAB.totalSuccessfulAmount, + ) + abilityBA := makeAbility( + startTime, endTime, uptimeBA, inputsBA.totalSuccessfulAmount, + ) + + return abilityAB, abilityBA, nil +} + +// mergeEventSlices interleaves two sorted event streams into a single +// chronological iter.Seq2. Equal-timestamp events from sliceA are yielded +// first. Self-pair calls (sliceA == sliceB) yield each event twice. Callers +// must keep their state updates idempotent under same-timestamp duplicates. +func mergeEventSlices(sliceA, sliceB []*ChannelEvent) channelEventSeq { + return func(yield func(*ChannelEvent, error) bool) { + i, j := 0, 0 + + // Interleave both slices until one is exhausted, ensuring + // strict chronological order across the combined stream. + for i < len(sliceA) && j < len(sliceB) { + if sliceA[i].Timestamp.After(sliceB[j].Timestamp) { + if !yield(sliceB[j], nil) { + return + } + j++ + } else { + if !yield(sliceA[i], nil) { + return + } + i++ + } + } + + // Drain any remaining events from sliceA. This loop only + // executes if sliceB was exhausted first. + for ; i < len(sliceA); i++ { + if !yield(sliceA[i], nil) { + return + } + } + + // Drain any remaining events from sliceB. This loop only + // executes if sliceA was exhausted first. + for ; j < len(sliceB); j++ { + if !yield(sliceB[j], nil) { + return + } + } + } +} + +// copyChannelStates returns a deep copy of the per-channel state map so the +// bidirectional walk cannot mutate the caller's snapshot. +func copyChannelStates(states map[int64]*channelState) map[int64]*channelState { + statesCopy := make(map[int64]*channelState, len(states)) + for chanID, state := range states { + statesCopy[chanID] = &channelState{ + online: state.online, + localBalance: state.localBalance, + remoteBalance: state.remoteBalance, + } + } + + return statesCopy +} + +// applyEvent advances a channel's snapshot by one event. Update events imply +// online and overwrite whichever balance the event carries. Unknown event +// types return errUnknownEventType to surface store↔analyzer schema drift. +func applyEvent(state *channelState, event *ChannelEvent) error { + switch event.EventType { + case EventTypeOffline: + state.online = false + + case EventTypeOnline: + state.online = true + + case EventTypeUpdate: + state.online = true + event.LocalBalance.WhenSome( + func(amt btcutil.Amount) { + state.localBalance = amt + }, + ) + event.RemoteBalance.WhenSome( + func(amt btcutil.Amount) { + state.remoteBalance = amt + }, + ) + + default: + return fmt.Errorf("%w: chanID=%d type=%v", errUnknownEventType, + event.ChannelID, event.EventType) + } + + return nil +} + +// makeAbility folds an accumulated uptime and successful-amount total into a +// ForwardingAbility. When uptime is zero and forwards landed, the result is +// flagged Inconsistent with zero Velocity. +func makeAbility(startTime, endTime time.Time, totalUptime time.Duration, + totalAmt btcutil.Amount) *ForwardingAbility { + + if totalUptime == 0 { + return &ForwardingAbility{Inconsistent: totalAmt > 0} + } + + totalDuration := endTime.Sub(startTime) + + return &ForwardingAbility{ + Velocity: float64(totalAmt) / totalUptime.Seconds(), + UptimeFraction: float64(totalUptime) / float64(totalDuration), + } +} diff --git a/chanevents/analyzer_test.go b/chanevents/analyzer_test.go new file mode 100644 index 0000000..a58a7fd --- /dev/null +++ b/chanevents/analyzer_test.go @@ -0,0 +1,706 @@ +package chanevents + +import ( + "context" + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/stretchr/testify/require" +) + +// nextEventID is incremented by newEvent so every synthetic event carries a +// unique ID, making assertion failures easier to trace. +var nextEventID int64 = 1 + +// newEvent returns an update-typed ChannelEvent with the given balances. The +// ID is auto-incremented so failing assertions can identify the offending row. +func newEvent(chanID int64, ts int64, eventType EventType, local, + remote btcutil.Amount) *ChannelEvent { + + id := nextEventID + nextEventID++ + + return &ChannelEvent{ + ID: id, + ChannelID: chanID, + Timestamp: time.Unix(ts, 0), + EventType: eventType, + LocalBalance: fn.Some(local), + RemoteBalance: fn.Some(remote), + } +} + +// newStatusEvent returns an Online or Offline event with no balance payload, +// matching the schema contract for non-Update event types. +func newStatusEvent(chanID int64, ts int64, eventType EventType) *ChannelEvent { + return &ChannelEvent{ + ChannelID: chanID, + Timestamp: time.Unix(ts, 0), + EventType: eventType, + LocalBalance: fn.None[btcutil.Amount](), + RemoteBalance: fn.None[btcutil.Amount](), + } +} + +// TestMergeEventSlices verifies that interleaving distinct or identical streams +// preserves strict chronological order and resolves timestamp collisions +// deterministically. +func TestMergeEventSlices(t *testing.T) { + t.Parallel() + + const ( + fromA int64 = 1 + fromB int64 = 2 + ) + + // selfPair is the same backing slice passed as both sliceA and sliceB + // in the self-pair row. The merge must yield each element twice. + selfPair := []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + } + + testCases := []struct { + name string + sliceA []*ChannelEvent + sliceB []*ChannelEvent + expected []*ChannelEvent + }{ + { + name: "Both empty", + }, + { + name: "Only A", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + }, + }, + { + name: "Only B", + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + }, + { + name: "Disjoint A before B", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 150, EventTypeOffline), + }, + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromB, 250, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 150, EventTypeOffline), + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromB, 250, EventTypeOffline), + }, + }, + { + name: "Interleaved", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 300, EventTypeOffline), + }, + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromB, 400, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromA, 300, EventTypeOffline), + newStatusEvent(fromB, 400, EventTypeOffline), + }, + }, + { + name: "Equal timestamps yield A first", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + }, + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + }, + { + name: "Self-pair duplicates each event", + sliceA: selfPair, + sliceB: selfPair, + expected: []*ChannelEvent{ + selfPair[0], selfPair[0], + selfPair[1], selfPair[1], + }, + }, + } + + for _, tc := range testCases { + t.Run( + tc.name, + func(t *testing.T) { + t.Parallel() + + var got []*ChannelEvent + for event, err := range mergeEventSlices( + tc.sliceA, tc.sliceB, + ) { + require.NoError(t, err) + got = append(got, event) + } + require.Equal(t, tc.expected, got) + }, + ) + } +} + +// TestMergeEventSlicesEarlyTermination verifies that the merge sequence safely +// halts mid-stream without exhausting inputs when the consumer aborts. +func TestMergeEventSlicesEarlyTermination(t *testing.T) { + t.Parallel() + + sliceA := []*ChannelEvent{ + newStatusEvent(1, 100, EventTypeOnline), + newStatusEvent(1, 300, EventTypeOffline), + } + sliceB := []*ChannelEvent{ + newStatusEvent(2, 200, EventTypeOnline), + newStatusEvent(2, 400, EventTypeOffline), + } + + var got []*ChannelEvent + for event, err := range mergeEventSlices(sliceA, sliceB) { + require.NoError(t, err) + got = append(got, event) + if len(got) == 2 { + break + } + } + require.Len(t, got, 2) +} + +// TestCalculateBothDirectionsUptime verifies that the bidirectional uptime +// walk correctly attributes effective uptime given varying liveness, balance +// changes, and forwarding amounts. Each table row pins one boundary condition +// of the (A→B) direction. Full bidirectional invariants are covered by +// dedicated tests. +func TestCalculateBothDirectionsUptime(t *testing.T) { + t.Parallel() + + var ( + chanInID int64 = 1 + chanOutID int64 = 2 + startTime = time.Unix(100, 0) + endTime = time.Unix(200, 0) + ) + + testCases := []struct { + name string + + inStates map[int64]*channelState + outStates map[int64]*channelState + inEvents []*ChannelEvent + outEvents []*ChannelEvent + + successAmts []btcutil.Amount + thresholdAmount btcutil.Amount + forwardPercentile float64 + + expected *ForwardingAbility + expectedErr string + }{ + { + name: "Basic case always online", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + successAmts: []btcutil.Amount{ + 100, + }, + expected: &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1.0, + }, + }, + { + // The forward in successAmts updates both channels: + // the in-channel's remoteBalance drops by the amount + // (peer A spent it) and the out-channel's localBalance + // drops by the same (forwarded out to B). Both Update + // events fire at the forward's timestamp. The threshold + // straddles pre and post liquidity so the boundary at + // the forward time is what carves the uptime window. + // The later tests in here aren't causally consistent + // with the successAmts, which in general is the source + // of truth for forwards. + name: "Forward drives the balance timeline", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1500, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 1000, + }, + }, + inEvents: []*ChannelEvent{ + // Forward of 200 sats at t=150 on the in side: + // local 0 → 200, remote 1500 → 1300. + newEvent( + chanInID, 150, EventTypeUpdate, 200, + 1300, + ), + }, + outEvents: []*ChannelEvent{ + // Same forward on the out side: local 1000 → + // 800, remote 0 → 200. + newEvent( + chanOutID, 150, EventTypeUpdate, 800, + 200, + ), + }, + successAmts: []btcutil.Amount{ + 200, + }, + thresholdAmount: 900, + // t=100..150 (50s): liq = min(1500, 1000) = 1000 + // > 900 → qualifies. + // t=150..200 (50s): liq = min(1300, 800) = 800 + // < 900 → drops out. + // Total uptime = 50s, total amount = 200 sats. + expected: &ForwardingAbility{ + Velocity: 4, // 200 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Channel goes offline", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + inEvents: []*ChannelEvent{ + newStatusEvent(chanInID, 150, EventTypeOffline), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Balance change", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 150, EventTypeUpdate, 1200, + 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + // Balance changes at t=150, so for the first 50s the + // liquidity is 800, then it's 1000 for the next 50s. + // The total effective uptime is 100s, because the + // liquidity threshold is low. + expected: &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1, + }, + }, + { + name: "Duplicate event timestamps", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + inEvents: []*ChannelEvent{ + newStatusEvent(chanInID, 150, EventTypeOffline), + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 150, EventTypeUpdate, 1200, + 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + // At t=150, two events happen. From t=100 to t=150 + // (50s), liquidity is min(1000, 800) = 800. After + // t=150, chanIn is offline, so liquidity is 0 for the + // remaining 50s. + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "No initial state", + inStates: map[int64]*channelState{ + chanInID: { + online: false, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: false, + }, + }, + inEvents: []*ChannelEvent{ + newEvent( + chanInID, 120, EventTypeUpdate, 0, 1000, + ), + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 140, EventTypeUpdate, 800, 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + // We don't have initial balance states, so we can't + // determine liquidity until we see an event on both + // channels. At t=140 we know the liquidity is 800, and + // it's online for the remaining 60s of the 100s total. + // So uptime fraction is 0.6 for 800. + expected: &ForwardingAbility{ + // 100 sats / 60s + Velocity: 1.6666666666666667, + UptimeFraction: 0.6, + }, + }, + { + name: "Multiple channels for out peer", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + 3: { + online: true, + localBalance: 500, + }, + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 150, EventTypeUpdate, 1200, + 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 900, + // We expect the liquidity to be the sum of the + // available balances of the out channels. t=100-150: + // min(1000, 800 + 500) = 1000 t=150-200: min(1000, 1200 + // + 500) = 1000 + expected: &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1.0, + }, + }, + { + name: "Circular payment ability", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + localBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanInID: { + online: true, + localBalance: 1000, + }, + }, + inEvents: []*ChannelEvent{ + newEvent( + chanInID, 150, EventTypeUpdate, 500, + 500, + ), + }, + outEvents: []*ChannelEvent{ + newEvent( + chanInID, 150, EventTypeUpdate, 500, + 500, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + // For the first 50s, liquidity is min(1000, 0) = 0. For + // the next 50s, liquidity is min(500, 500) = 500. + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Self route multiple channels", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + chanOutID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + chanOutID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + }, + inEvents: []*ChannelEvent{}, + outEvents: []*ChannelEvent{ + // At 150s (midpoint), out channel balance drops + // to 0. + newEvent( + chanOutID, 150, EventTypeUpdate, 0, + 2000, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1500, + // Initial fwdLiquidity = min(2000, 2000) = 2000. 2000 > + // 1500, so first 50s accrue. At t=150, chanOut local + // drops to 0. outStates total local becomes 1000 (from + // chanIn). fwdLiquidity = min(2000, 1000) = 1000. 1000 + // is not > 1500, so last 50s do not accrue. + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Zero uptime no forwards yields zero velocity", + inStates: map[int64]*channelState{ + chanInID: { + online: false, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: false, + }, + }, + expected: &ForwardingAbility{ + Velocity: 0, + UptimeFraction: 0, + }, + }, + { + name: "Zero uptime with forwards is flagged inconsistent", + inStates: map[int64]*channelState{ + chanInID: { + online: false, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: false, + }, + }, + successAmts: []btcutil.Amount{ + 100, + }, + expected: &ForwardingAbility{ + Velocity: 0, + UptimeFraction: 0, + Inconsistent: true, + }, + }, + } + + for _, tc := range testCases { + t.Run( + tc.name, + func(t *testing.T) { + t.Parallel() + + var totalSuccessfulAmount btcutil.Amount + for _, amt := range tc.successAmts { + totalSuccessfulAmount += amt + } + + mergedEvents := mergeEventSlices( + tc.inEvents, tc.outEvents, + ) + + inputsAB := pairInputs{ + threshold: tc.thresholdAmount, + totalSuccessfulAmount: totalSuccessfulAmount, + } + + // The (B→A) inputs are not asserted by this + // test. Pass zero so the second ability is + // well-defined but ignored. + var inputsBA pairInputs + + abilityAB, _, err := + calculateBothDirectionsUptime( + context.Background(), + startTime, endTime, + inputsAB, inputsBA, + tc.inStates, tc.outStates, + mergedEvents, + ) + require.NoError(t, err) + require.Equal(t, tc.expected, abilityAB) + }, + ) + } +} + +// TestCalculateBothDirectionsUptimeAsymmetric pins that the two accumulators +// are independent: with liquidity that crosses only the A→B threshold, the B→A +// direction must report zero uptime regardless of how high the A→B side scores. +func TestCalculateBothDirectionsUptimeAsymmetric(t *testing.T) { + t.Parallel() + + var ( + chanInID int64 = 1 + chanOutID int64 = 2 + startTime = time.Unix(100, 0) + endTime = time.Unix(200, 0) + ) + + // A holds inbound liquidity (remoteBalance high). B holds outbound + // liquidity (localBalance high). The situation favours A→B and starves + // B→A. + statesA := map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + localBalance: 100, + }, + } + statesB := map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 1000, + remoteBalance: 100, + }, + } + + // Threshold sits between the two directions: A→B has min(1000, 1000) + // = 1000 ≥ 500 (qualifying); B→A has min(100, 100) = 100 < 500 (not + // qualifying). + inputsAB := pairInputs{ + threshold: 500, + totalSuccessfulAmount: 100, + } + inputsBA := pairInputs{ + threshold: 500, + totalSuccessfulAmount: 50, + } + + abilityAB, abilityBA, err := calculateBothDirectionsUptime( + context.Background(), startTime, endTime, + inputsAB, inputsBA, statesA, statesB, + mergeEventSlices(nil, nil), + ) + require.NoError(t, err) + + require.Equal( + t, &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1.0, + }, abilityAB, + ) + require.Equal( + t, &ForwardingAbility{ + Velocity: 0, + UptimeFraction: 0, + // Forwards landed but BA never crossed threshold. + Inconsistent: true, + }, abilityBA, + ) +} From 75f078359327239a5deb6f02e3d9ef9255310313 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 14 May 2026 08:58:23 +0200 Subject: [PATCH 2/5] chanevents: add initial-state seeding Establish the boundary between the chanevents store and the upcoming forwarding analyzer plus the seed walk every pair calculation depends on. EventsSource is the read surface the analyzer consumes, and ForwardingAnalyzer carries it alongside an lnd handle so the upcoming driver can fold lnd's open and closed channel sets into the considered population. getInitialChannelState reconstructs a channel's state at startTime by seeding from the latest pre-window update and replaying any residual same-timestamp siblings the SQL keyset may have surfaced. The residual range is bounded by definition (events between two adjacent timestamps within a channel), so streaming would buy nothing over materialising the slice in one call. A guard error flags later-timestamp updates in the residual walk as schema drift. --- chanevents/analyzer.go | 137 ++++++++++++++++++++++++++++++++++++ chanevents/analyzer_test.go | 56 +++++++++++++++ 2 files changed, 193 insertions(+) diff --git a/chanevents/analyzer.go b/chanevents/analyzer.go index 232dcbe..1751f94 100644 --- a/chanevents/analyzer.go +++ b/chanevents/analyzer.go @@ -11,15 +11,54 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btclog/v2" + "github.com/lightninglabs/lndclient" ) var ( + // errUnexpectedUpdateEvent fires when getInitialChannelState's + // residual-event walk surfaces an Update at a timestamp newer than the + // seed update. + errUnexpectedUpdateEvent = errors.New("unexpected update event in " + + "initial-state walk") + // errUnknownEventType fires when the event-replay switch sees an // EventType outside {Offline, Online, Update}. Indicates schema drift // between the store and the analyzer. errUnknownEventType = errors.New("unknown channel event type") ) +// EventsSource abstracts the chanevents store so ForwardingAnalyzer can derive +// uptime metrics without coupling to a specific storage backend. +type EventsSource interface { + // GetLatestChannelUpdateBefore returns the latest channel event before + // the given time, or (nil, nil) if no event predates it. + GetLatestChannelUpdateBefore(ctx context.Context, channelID int64, + before time.Time) (*ChannelEvent, error) + + // GetChannelEvents fetches up to limit events for a channel with id > + // afterID and timestamp in [startTime, endTime), ordered by id ASC. + // A large limit (e.g. math.MaxInt32) retrieves the entire range. + GetChannelEvents(ctx context.Context, channelID, afterID int64, + startTime, endTime time.Time, + limit int32) ([]*ChannelEvent, error) + + // GetChannelByShortChanID resolves an scid to a Channel, returning + // ErrUnknownChannel when no row matches. + GetChannelByShortChanID(ctx context.Context, + shortChannelID uint64) (*Channel, error) + + // ScidToPeerMap returns the historically recorded scid→peer index, + // including closed channels. + ScidToPeerMap(ctx context.Context) (map[uint64]string, error) +} + +// ForwardingAnalyzer computes forwarding velocity and effective uptime for +// every (peerIn, peerOut) pair. +type ForwardingAnalyzer struct { + store EventsSource + lnd lndclient.LndServices +} + // channelEventSeq is a chronologically ordered stream of channel events // paired with a propagated error value. type channelEventSeq = iter.Seq2[*ChannelEvent, error] @@ -57,6 +96,104 @@ type channelState struct { remoteBalance btcutil.Amount } +// NewForwardingAnalyzer returns a ready-to-use analyzer. +func NewForwardingAnalyzer(store EventsSource, + lnd lndclient.LndServices) *ForwardingAnalyzer { + + return &ForwardingAnalyzer{ + store: store, + lnd: lnd, + } +} + +// getInitialChannelState reconstructs a channel's state at startTime by seeding +// from the latest pre-window update and replaying any residual same-second +// siblings the SQL keyset may have surfaced. A channel with no prior update is +// treated as offline with zero balance. +func (a *ForwardingAnalyzer) getInitialChannelState(ctx context.Context, + startTime time.Time, channelID int64) (*channelState, error) { + + lastUpdate, err := a.store.GetLatestChannelUpdateBefore( + ctx, channelID, startTime, + ) + if err != nil { + return nil, err + } + + if lastUpdate == nil { + log.TraceS( + ctx, "No update event for channel", + slog.Int64("channelID", channelID), + slog.Time("startTime", startTime), + ) + + return &channelState{online: false}, nil + } + + // An update event always implies the channel is online. + state := &channelState{ + online: true, + } + lastUpdate.LocalBalance.WhenSome( + func(amt btcutil.Amount) { + state.localBalance = amt + }, + ) + lastUpdate.RemoteBalance.WhenSome( + func(amt btcutil.Amount) { + state.remoteBalance = amt + }, + ) + + // Fetch any residual events between the last update and the start time. + // The range is bounded (typically a handful of same-second siblings or + // status events) so materialising in one call is fine. Replay below + // assumes id-ASC matches chronological order, true while writers leave + // Timestamp zero so the store stamps clock.Now(). Overflow at the cap + // signals pathological volume the analyzer cannot safely seed from. + const residualEventLimit = 1024 + + residual, err := a.store.GetChannelEvents( + ctx, channelID, lastUpdate.ID, lastUpdate.Timestamp, startTime, + residualEventLimit, + ) + if err != nil { + return nil, err + } + + if len(residual) == residualEventLimit { + return nil, fmt.Errorf("residual events overflow (>=%d) for "+ + "chanID=%d", residualEventLimit, channelID) + } + + // Replay the residual events to arrive at the channel state on the + // window's open. + for _, event := range residual { + switch event.EventType { + case EventTypeOffline: + state.online = false + + case EventTypeOnline: + state.online = true + + case EventTypeUpdate: + // Defensively check that the seed update is indeed the + // latest before startTime. + if !event.Timestamp.Equal(lastUpdate.Timestamp) { + return nil, fmt.Errorf("%w: chanID=%d ts=%v", + errUnexpectedUpdateEvent, channelID, + event.Timestamp) + } + + default: + return nil, fmt.Errorf("%w: chanID=%d type=%v", + errUnknownEventType, channelID, event.EventType) + } + } + + return state, nil +} + // determineThreshold establishes the required liquidity floor based on the // user's manual threshold or the calculated percentile of successful forwards. func determineThreshold(forwardPercentile float64, diff --git a/chanevents/analyzer_test.go b/chanevents/analyzer_test.go index a58a7fd..acd72a9 100644 --- a/chanevents/analyzer_test.go +++ b/chanevents/analyzer_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/fn/v2" "github.com/stretchr/testify/require" ) @@ -704,3 +705,58 @@ func TestCalculateBothDirectionsUptimeAsymmetric(t *testing.T) { }, abilityBA, ) } + +// TestInitialStateSameSecond verifies that when multiple update events share +// the same second-resolution timestamp, getInitialChannelState seeds from the +// most recent one (highest id) rather than aborting or choosing arbitrarily. +func TestInitialStateSameSecond(t *testing.T) { + t.Parallel() + + clock := clock.NewTestClock(testTime) + store := NewTestDB(t, clock) + ctx := context.Background() + + peerID, err := store.AddPeer(ctx, testPubKey) + require.NoError(t, err) + + channelID, err := store.AddChannel( + ctx, testChanPoint1, testShortChanID1, peerID, + ) + require.NoError(t, err) + + // Two update events share the same second-resolution timestamp. The + // second insert (higher id) is the one the SQL must pick. + sameTime := testTime.Add(10 * time.Second) + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeUpdate, + Timestamp: sameTime, + LocalBalance: fn.Some(btcutil.Amount(100)), + RemoteBalance: fn.Some(btcutil.Amount(900)), + }, + ) + require.NoError(t, err) + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeUpdate, + Timestamp: sameTime, + LocalBalance: fn.Some(btcutil.Amount(200)), + RemoteBalance: fn.Some(btcutil.Amount(800)), + }, + ) + require.NoError(t, err) + + // Construct a bare analyzer. getInitialChannelState only touches the + // store, so the lnd field can stay zero. + a := &ForwardingAnalyzer{store: store} + + startTime := sameTime.Add(time.Second) + state, err := a.getInitialChannelState(ctx, startTime, channelID) + require.NoError(t, err) + require.NotNil(t, state) + require.True(t, state.online, "an update event implies online") + require.Equal(t, btcutil.Amount(200), state.localBalance) + require.Equal(t, btcutil.Amount(800), state.remoteBalance) +} From f1274a5d4e73a5e60ac83f428bea44370690861b Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 14 May 2026 09:00:38 +0200 Subject: [PATCH 3/5] chanevents: add forwarding ability analyzer Wire EffectiveUptime as the analyzer's public entry. The pipeline resolves channels to peers from the store, folds lnd's forwarding history into per-pair success amounts, augments the considered set with lnd's open and closed channels to hedge survivorship bias, seeds each channel's state at startTime, and dispatches every peer pair to the bidirectional walk. calculateAllPairsUptime walks the unordered cross-product (i, j>=i) with a lazy per-peer event cache. Each peer's events are fetched once and replayed across every pair that consumes them. The cached events live as a slice so the cross-pair merge can use a two-pointer walk instead of an iter-based merge that would need goroutine plus channel synchronisation per event. --- chanevents/analyzer.go | 373 ++++++++++++++++++++++++++++++++++++ chanevents/analyzer_test.go | 142 ++++++++++++++ 2 files changed, 515 insertions(+) diff --git a/chanevents/analyzer.go b/chanevents/analyzer.go index 1751f94..5da6ee3 100644 --- a/chanevents/analyzer.go +++ b/chanevents/analyzer.go @@ -7,6 +7,7 @@ import ( "iter" "log/slog" "math" + "sort" "time" "github.com/btcsuite/btcd/btcutil" @@ -80,6 +81,14 @@ type ForwardingAbility struct { Inconsistent bool } +// PeerPair identifies a unidirectional routing edge from PeerIn to PeerOut. +// PeerIn names the source-side peer (the incoming channel's far end in lnd's +// forwarding vocabulary) and PeerOut names the sink-side peer. +type PeerPair struct { + PeerIn string + PeerOut string +} + // pairInputs encapsulates the routing performance thresholds for a single // direction. type pairInputs struct { @@ -106,6 +115,216 @@ func NewForwardingAnalyzer(store EventsSource, } } +// EffectiveUptime returns a ForwardingAbility for every (peerIn, peerOut) pair +// over [startTime, endTime). Closed channels are folded into the considered set +// so survivorship bias does not skew the uptime denominator. The liquidity +// floor is the fwdPercentile-th percentile of successful forward amounts (with +// fwdPercentile in [0, 100]), bounded below by threshold. When forwards land +// but the floor is never crossed, the returned ability is flagged Inconsistent. +func (a *ForwardingAnalyzer) EffectiveUptime(ctx context.Context, startTime, + endTime time.Time, fwdPercentile float64, threshold btcutil.Amount) ( + map[PeerPair]ForwardingAbility, error) { + + if fwdPercentile < 0 || fwdPercentile > 100 { + return nil, fmt.Errorf("fwdPercentile %v outside [0, 100]", + fwdPercentile) + } + + log.DebugS( + ctx, "Calculating effective uptime", + slog.Time("startTime", startTime), + slog.Time("endTime", endTime), + slog.Float64("fwdPercentile", fwdPercentile), + slog.Int64("threshold", int64(threshold)), + ) + + scidToPeer, err := a.store.ScidToPeerMap(ctx) + if err != nil { + return nil, err + } + log.DebugS( + ctx, "Found historical channels", + slog.Int("count", len(scidToPeer)), + ) + + successfulForwards, channelPeersConsidered, err := a.getForwardingData( + ctx, startTime, endTime, scidToPeer, + ) + if err != nil { + return nil, err + } + log.DebugS( + ctx, "Found peer pairs with successful forwards", + slog.Int("count", len(successfulForwards)), + ) + + err = a.addActiveChannels(ctx, channelPeersConsidered) + if err != nil { + return nil, err + } + + peerChannels, initialStates, err := a.getPeerChannelData( + ctx, startTime, channelPeersConsidered, + ) + if err != nil { + return nil, err + } + log.DebugS( + ctx, "Identified channels for peers", + slog.Int("count", len(peerChannels)), + ) + + return calculateAllPairsUptime( + ctx, a.store, startTime, endTime, fwdPercentile, threshold, + successfulForwards, initialStates, peerChannels, + ) +} + +// getForwardingData returns successful forwards and channels from lnd's +// forwarding history over [startTime, endTime), indexed by peer pair. Unknown +// channels are skipped. +func (a *ForwardingAnalyzer) getForwardingData(ctx context.Context, startTime, + endTime time.Time, scidToPeer map[uint64]string) ( + map[PeerPair][]btcutil.Amount, map[uint64]string, error) { + + fwds, err := a.lnd.Client.ForwardingHistory( + ctx, lndclient.ForwardingHistoryRequest{ + StartTime: startTime, + EndTime: endTime, + }, + ) + if err != nil { + return nil, nil, err + } + log.DebugS( + ctx, "Found forwarding events", + slog.Int( + "count", len(fwds.Events), + ), + ) + + channelPeersConsidered := make(map[uint64]string) + successfulForwards := make(map[PeerPair][]btcutil.Amount) + for _, fwd := range fwds.Events { + inPeer, ok := scidToPeer[fwd.ChannelIn] + if !ok { + log.WarnS( + ctx, "Could not find peer for incoming channel", + nil, slog.Uint64("channelIn", fwd.ChannelIn), + ) + continue + } + + outPeer, ok := scidToPeer[fwd.ChannelOut] + if !ok { + log.WarnS( + ctx, "Could not find peer for outgoing channel", + nil, slog.Uint64("channelOut", fwd.ChannelOut), + ) + continue + } + + channelPeersConsidered[fwd.ChannelIn] = inPeer + channelPeersConsidered[fwd.ChannelOut] = outPeer + + pair := PeerPair{ + PeerIn: inPeer, + PeerOut: outPeer, + } + + amt := fwd.AmountMsatOut.ToSatoshis() + successfulForwards[pair] = append(successfulForwards[pair], amt) + } + + return successfulForwards, channelPeersConsidered, nil +} + +// addActiveChannels ensures the channel set includes both open and closed +// channels so that channels that closed during the analysis period are not +// silently excluded. +func (a *ForwardingAnalyzer) addActiveChannels(ctx context.Context, + channelPeersConsidered map[uint64]string) error { + + // Currently open channels surface their peer directly. + openChannels, err := a.lnd.Client.ListChannels(ctx, false, false) + if err != nil { + return err + } + + for _, channel := range openChannels { + channelPeersConsidered[channel.ChannelID] = + channel.PubKeyBytes.String() + } + + // Historically closed channels are added so survivorship bias does not + // skew the denominator. + closedChannels, err := a.lnd.Client.ClosedChannels(ctx) + if err != nil { + return err + } + + for _, channel := range closedChannels { + // Channels that did not confirm onchain will not have a + // ChannelID. + if channel.ChannelID == 0 { + continue + } + + channelPeersConsidered[channel.ChannelID] = + channel.PubKeyBytes.String() + } + + return nil +} + +// getPeerChannelData returns channels and their initial state at startTime, +// grouped by peer, including only those present in the store. +func (a *ForwardingAnalyzer) getPeerChannelData(ctx context.Context, + startTime time.Time, channelPeersConsidered map[uint64]string) ( + map[string][]int64, map[string]map[int64]*channelState, error) { + + peerChannels := make(map[string][]int64) + initialStates := make(map[string]map[int64]*channelState) + for scid, peerPubKey := range channelPeersConsidered { + channel, err := a.store.GetChannelByShortChanID(ctx, scid) + if errors.Is(err, ErrUnknownChannel) { + // Channels obtained from lnd but not present in the + // store. This can happen if the channel was very + // recently opened or closed and the store hasn't + // ingested the event yet. + log.DebugS( + ctx, "Skipping channel not in events store", + slog.Uint64("scid", scid), + ) + + continue + } + if err != nil { + return nil, nil, err + } + + state, err := a.getInitialChannelState( + ctx, startTime, channel.ID, + ) + if err != nil { + return nil, nil, err + } + + if _, ok := initialStates[peerPubKey]; !ok { + initialStates[peerPubKey] = make( + map[int64]*channelState, + ) + } + initialStates[peerPubKey][channel.ID] = state + + peerChannels[peerPubKey] = append( + peerChannels[peerPubKey], channel.ID, + ) + } + + return peerChannels, initialStates, nil +} + // getInitialChannelState reconstructs a channel's state at startTime by seeding // from the latest pre-window update and replaying any residual same-second // siblings the SQL keyset may have surfaced. A channel with no prior update is @@ -194,6 +413,160 @@ func (a *ForwardingAnalyzer) getInitialChannelState(ctx context.Context, return state, nil } +// calculateAllPairsUptime returns forwarding abilities for every peer pair, +// computing both directions (A→B and B→A) in a single pass. +func calculateAllPairsUptime(ctx context.Context, store EventsSource, startTime, + endTime time.Time, fwdPercentile float64, threshold btcutil.Amount, + successfulForwards map[PeerPair][]btcutil.Amount, + initialStates map[string]map[int64]*channelState, + peerChannels map[string][]int64) ( + map[PeerPair]ForwardingAbility, error) { + + results := make(map[PeerPair]ForwardingAbility) + recordResult := func(peerIn, peerOut string, a ForwardingAbility) { + results[PeerPair{PeerIn: peerIn, PeerOut: peerOut}] = a + } + + // Lazy per-peer event cache: each peer's events are fetched once and + // replayed across every pair walk that consumes them. + peerEvents := make(map[string][]*ChannelEvent, len(initialStates)) + loadPeer := func(peer string) ([]*ChannelEvent, error) { + if cached, ok := peerEvents[peer]; ok { + return cached, nil + } + + events, err := loadPeerEvents( + ctx, store, startTime, endTime, peerChannels[peer], + ) + if err != nil { + return nil, err + } + + peerEvents[peer] = events + + return events, nil + } + + peers := make([]string, 0, len(initialStates)) + for peer := range initialStates { + peers = append(peers, peer) + } + + for i, peerA := range peers { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + statesA := initialStates[peerA] + sliceA, err := loadPeer(peerA) + if err != nil { + return nil, err + } + + for j := i; j < len(peers); j++ { + peerB := peers[j] + statesB := initialStates[peerB] + + inputsAB, err := pairThresholdInputs( + fwdPercentile, threshold, successfulForwards, + peerA, peerB, + ) + if err != nil { + return nil, err + } + + inputsBA, err := pairThresholdInputs( + fwdPercentile, threshold, successfulForwards, + peerB, peerA, + ) + if err != nil { + return nil, err + } + + sliceB := sliceA + if i != j { + sliceB, err = loadPeer(peerB) + if err != nil { + return nil, err + } + } + + abilityAB, abilityBA, err := + calculateBothDirectionsUptime( + ctx, startTime, endTime, + inputsAB, inputsBA, + statesA, statesB, + mergeEventSlices(sliceA, sliceB), + ) + if err != nil { + return nil, err + } + + recordResult(peerA, peerB, *abilityAB) + if i != j { + recordResult(peerB, peerA, *abilityBA) + } + } + } + + return results, nil +} + +// loadPeerEvents fetches every event in [startTime, endTime) on the given +// channels and returns them merged into a single chronologically sorted slice. +// Events sharing a timestamp are ordered by ascending id so the result is +// deterministic. +func loadPeerEvents(ctx context.Context, store EventsSource, startTime, + endTime time.Time, chanIDs []int64) ([]*ChannelEvent, error) { + + var events []*ChannelEvent + for _, chanID := range chanIDs { + chanEvents, err := store.GetChannelEvents( + ctx, chanID, 0, startTime, endTime, math.MaxInt32, + ) + if err != nil { + return nil, err + } + events = append(events, chanEvents...) + } + + sort.SliceStable( + events, + func(i, j int) bool { + if events[i].Timestamp.Equal(events[j].Timestamp) { + return events[i].ID < events[j].ID + } + + return events[i].Timestamp.Before(events[j].Timestamp) + }, + ) + + return events, nil +} + +// pairThresholdInputs resolves the liquidity floor and cumulative forwarded +// amount for one direction of a peer pair, applying the percentile rule when +// historical forwards exist. +func pairThresholdInputs(fwdPercentile float64, threshold btcutil.Amount, + successfulForwards map[PeerPair][]btcutil.Amount, + peerIn, peerOut string) (pairInputs, error) { + + successAmts := successfulForwards[PeerPair{ + PeerIn: peerIn, PeerOut: peerOut, + }] + t, err := determineThreshold(fwdPercentile, threshold, successAmts) + if err != nil { + return pairInputs{}, err + } + + var total btcutil.Amount + for _, amt := range successAmts { + total += amt + } + + return pairInputs{threshold: t, totalSuccessfulAmount: total}, nil +} + // determineThreshold establishes the required liquidity floor based on the // user's manual threshold or the calculated percentile of successful forwards. func determineThreshold(forwardPercentile float64, diff --git a/chanevents/analyzer_test.go b/chanevents/analyzer_test.go index acd72a9..a8d8456 100644 --- a/chanevents/analyzer_test.go +++ b/chanevents/analyzer_test.go @@ -6,8 +6,10 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/routing/route" "github.com/stretchr/testify/require" ) @@ -760,3 +762,143 @@ func TestInitialStateSameSecond(t *testing.T) { require.Equal(t, btcutil.Amount(200), state.localBalance) require.Equal(t, btcutil.Amount(800), state.remoteBalance) } + +// stubLndChannelClient implements the three lndclient.LightningClient methods +// the analyzer exercises. The embedded interface is nil, so any other method +// panics. Callers must not invoke methods outside the overridden set. +type stubLndChannelClient struct { + lndclient.LightningClient + + openChannels []lndclient.ChannelInfo + closedChannels []lndclient.ClosedChannel + forwardingHistory *lndclient.ForwardingHistoryResponse +} + +func (s *stubLndChannelClient) ListChannels(_ context.Context, _, _ bool, + _ ...lndclient.ListChannelsOption) ([]lndclient.ChannelInfo, error) { + + return s.openChannels, nil +} + +func (s *stubLndChannelClient) ClosedChannels(_ context.Context) ( + []lndclient.ClosedChannel, error) { + + return s.closedChannels, nil +} + +func (s *stubLndChannelClient) ForwardingHistory(_ context.Context, + _ lndclient.ForwardingHistoryRequest) ( + *lndclient.ForwardingHistoryResponse, error) { + + if s.forwardingHistory == nil { + return &lndclient.ForwardingHistoryResponse{}, nil + } + + return s.forwardingHistory, nil +} + +// validPubKey1 is the open-channel peer. route.NewVertexFromStr requires a +// 33-byte compressed key (66 hex chars). The pre-existing testPubKey is 65 +// chars long and is fine for the store layer, but the lnd survivorship path +// goes through route.NewVertexFromStr so we use a valid pair here. +const ( + validPubKey1 = "028d4c6347426f2e3f5e2b8e4a1c3b9f1c" + + "4e5d6f7a8b9c0d1e2f3a4b5c6d7e8f9a" + validPubKey2 = "038d4c6347426f2e3f5e2b8e4a1c3b9f1c" + + "4e5d6f7a8b9c0d1e2f3a4b5c6d7e8f9a" +) + +// TestEffectiveUptimeIncludesClosedChannels exercises the survivorship-bias +// guarantee of EffectiveUptime: a peer whose only channel was closed before the +// analysis window must still appear in the result map. Without merging lnd's +// ClosedChannels into the considered set, the closed-channel peer would be +// invisible to the walk and the fleet's reported uptime would over-state +// reality. +func TestEffectiveUptimeIncludesClosedChannels(t *testing.T) { + t.Parallel() + + clk := clock.NewTestClock(testTime) + store := NewTestDB(t, clk) + ctx := context.Background() + + // Two peers: the open-channel peer and the closed-channel peer. + openPeerID, err := store.AddPeer(ctx, validPubKey1) + require.NoError(t, err) + closedPeerID, err := store.AddPeer(ctx, validPubKey2) + require.NoError(t, err) + + // Both channels live in the chanevents store. lnd will report the first + // via ListChannels and the second only via ClosedChannels. + openScid := testShortChanID1 + closedScid := testShortChanID2 + openChanID, err := store.AddChannel( + ctx, testChanPoint1, openScid, openPeerID, + ) + require.NoError(t, err) + closedChanID, err := store.AddChannel( + ctx, testChanPoint2, closedScid, closedPeerID, + ) + require.NoError(t, err) + + // Seed an Update event before startTime for each channel so the + // initial-state walk has a non-zero baseline. Without a baseline, the + // closed channel's online state would be false and its presence in the + // result map would not prove the survivorship code path drove it. + seedTime := testTime + for _, chanID := range []int64{openChanID, closedChanID} { + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: chanID, + EventType: EventTypeUpdate, + Timestamp: seedTime, + LocalBalance: fn.Some(btcutil.Amount(1000)), + RemoteBalance: fn.Some(btcutil.Amount(1000)), + }, + ) + require.NoError(t, err) + } + + openVertex, err := route.NewVertexFromStr(validPubKey1) + require.NoError(t, err) + closedVertex, err := route.NewVertexFromStr(validPubKey2) + require.NoError(t, err) + + // lnd reports only the open channel via ListChannels. The closed + // channel surfaces solely through ClosedChannels. + stub := &stubLndChannelClient{ + openChannels: []lndclient.ChannelInfo{ + { + ChannelID: openScid, + PubKeyBytes: openVertex, + }, + }, + closedChannels: []lndclient.ClosedChannel{ + { + ChannelID: closedScid, + PubKeyBytes: closedVertex, + }, + }, + } + + a := NewForwardingAnalyzer(store, lndclient.LndServices{Client: stub}) + + startTime := seedTime.Add(time.Second) + endTime := startTime.Add(time.Minute) + + abilities, err := a.EffectiveUptime(ctx, startTime, endTime, 0, 0) + require.NoError(t, err) + + // Cross-pair entries in both directions are the cleanest assertion + // that the closed-channel peer participates in the walk, not just + // indexes into it. Their presence is the survivorship guarantee + // under test: without merging lnd's ClosedChannels into the + // considered set, neither cross would appear. + require.Contains( + t, abilities, PeerPair{PeerIn: validPubKey1, PeerOut: validPubKey2}, + "closed-channel peer absent: survivorship handling skipped", + ) + require.Contains( + t, abilities, PeerPair{PeerIn: validPubKey2, PeerOut: validPubKey1}, + "closed-channel peer absent: survivorship handling skipped", + ) +} From 919ade30a11afa592980458d35ad99ce9c7b8574 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 14 May 2026 09:00:53 +0200 Subject: [PATCH 4/5] chanevents+db: widen test fixtures to testing.TB Promote the existing test-store and test-DB constructors from *testing.T to testing.TB so the upcoming EffectiveUptime benchmark can share the same fixture path as the existing tests. testing.TB is the shared interface of *testing.T and *testing.B, so every current caller keeps type-checking unchanged. --- chanevents/test_postgres.go | 2 +- chanevents/test_sql.go | 2 +- chanevents/test_sqlite.go | 2 +- db/postgres.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chanevents/test_postgres.go b/chanevents/test_postgres.go index daf34e7..5277a02 100644 --- a/chanevents/test_postgres.go +++ b/chanevents/test_postgres.go @@ -11,7 +11,7 @@ import ( ) // NewTestDB creates a new test chanevents.Store backed by a postgres DB. -func NewTestDB(t *testing.T, clock clock.Clock) *Store { +func NewTestDB(t testing.TB, clock clock.Clock) *Store { // We'll create a new test database. The call to NewTestPostgresDB will // automatically create the DB and apply the migrations. testDB := db.NewTestPostgresDB(t) diff --git a/chanevents/test_sql.go b/chanevents/test_sql.go index 1785159..9ff7a93 100644 --- a/chanevents/test_sql.go +++ b/chanevents/test_sql.go @@ -9,7 +9,7 @@ import ( ) // createStore is a helper function that creates a new Store. -func createStore(t *testing.T, sqlDB *sqldb.BaseDB, clock clock.Clock) *Store { +func createStore(t testing.TB, sqlDB *sqldb.BaseDB, clock clock.Clock) *Store { queries := sqlc.NewForType(sqlDB, sqlDB.BackendType) store := NewStore(sqlDB, queries, clock) diff --git a/chanevents/test_sqlite.go b/chanevents/test_sqlite.go index a6c3522..7cbf7f5 100644 --- a/chanevents/test_sqlite.go +++ b/chanevents/test_sqlite.go @@ -12,7 +12,7 @@ import ( ) // NewTestDB creates a new test chanevents.Store backed by a sqlite DB. -func NewTestDB(t *testing.T, clock clock.Clock) *Store { +func NewTestDB(t testing.TB, clock clock.Clock) *Store { // We'll create a new test database. The call to NewTestSqliteDB will // automatically create the DB and apply the migrations. testDB := sqldb.NewTestSqliteDB(t, db.FaradayMigrationSets) diff --git a/db/postgres.go b/db/postgres.go index 4a0cb82..8cc42b0 100644 --- a/db/postgres.go +++ b/db/postgres.go @@ -9,7 +9,7 @@ import ( // NewTestPostgresDB is a helper function that creates a Postgres database for // testing. -func NewTestPostgresDB(t *testing.T) *sqldb.PostgresStore { +func NewTestPostgresDB(t testing.TB) *sqldb.PostgresStore { t.Helper() t.Logf("Creating new Postgres DB for testing") From 4c245b185fbcd0ce4efb0429592c09a610e7d5c6 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 14 May 2026 09:04:00 +0200 Subject: [PATCH 5/5] temp: add EffectiveUptime benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drive realistic K² pair walks against two fixture shapes. The single-channel-per-peer bench (100 peers × 1 channel × 200 events) exercises the bench infrastructure and matches the typical Lightning node profile. The multi-channel-per-peer bench (30 peers × 5 channels × 130 events) keeps the per-peer cache fill and the cross-pair merge on the hot path at the same time, which is the shape where the analyzer is most sensitive to algorithmic choices. A bench fixture is cached under $TMPDIR keyed by the bench parameters so re-runs skip the SQLite insert cost. On a fixture hit only an OS-level file copy and the lnd-stub channel-list reconstruction run. --- chanevents/analyzer_bench_test.go | 468 ++++++++++++++++++++++++++++++ 1 file changed, 468 insertions(+) create mode 100644 chanevents/analyzer_bench_test.go diff --git a/chanevents/analyzer_bench_test.go b/chanevents/analyzer_bench_test.go new file mode 100644 index 0000000..d940d01 --- /dev/null +++ b/chanevents/analyzer_bench_test.go @@ -0,0 +1,468 @@ +package chanevents + +import ( + "context" + "database/sql" + "fmt" + "io" + "math/rand" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/faraday/db" + "github.com/lightninglabs/faraday/db/sqlc" + "github.com/lightninglabs/lndclient" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/sqldb/v2" + "github.com/stretchr/testify/require" +) + +// benchFixtureVersion bumps whenever the populator changes shape so stale +// fixtures from older runs don't poison new benchmark numbers. +const benchFixtureVersion = 2 + +// BenchmarkEffectiveUptimeMultiChan exercises EffectiveUptime against a +// multi-channel dataset (numPeers peers, channelsPerPeer channels each) to +// show the analyzer's K² walk cost under a realistic channel-fan-out shape. +func BenchmarkEffectiveUptimeMultiChan(b *testing.B) { + const ( + numPeers = 30 + channelsPerPeer = 5 + updatesPerChannel = 130 + seed = 4242 + ) + + ctx := context.Background() + clk := clock.NewTestClock(testTime) + + startTime := testTime.Add(time.Hour) + endTime := startTime.Add(24 * time.Hour) + + store, openChannels := loadOrPopulateMultiChanStore( + b, ctx, clk, numPeers, channelsPerPeer, updatesPerChannel, seed, + startTime, endTime, + ) + + stub := &stubLndChannelClient{openChannels: openChannels} + analyzer := NewForwardingAnalyzer( + store, lndclient.LndServices{ + Client: stub, + }, + ) + + // b.Loop drives one benchmark iteration per call (Go 1.24+ idiom). + for b.Loop() { + _, err := analyzer.EffectiveUptime( + ctx, startTime, endTime, 0, 0, + ) + require.NoError(b, err) + } +} + +func loadOrPopulateMultiChanStore(b *testing.B, ctx context.Context, + clk clock.Clock, numPeers, channelsPerPeer, updatesPerChannel, seed int, + startTime, endTime time.Time) (*Store, []lndclient.ChannelInfo) { + + fixtureName := fmt.Sprintf("faraday-bench-eu-mc-v%d-p%d-c%d-u%d-s%d.s"+ + "qlite", benchFixtureVersion, numPeers, channelsPerPeer, + updatesPerChannel, seed) + fixturePath := filepath.Join(os.TempDir(), fixtureName) + workingPath := filepath.Join(b.TempDir(), "tmp.db") + + openChannels := multiChanChannelList(b, numPeers, channelsPerPeer) + + if _, err := os.Stat(fixturePath); err == nil { + require.NoError(b, copyFile(fixturePath, workingPath)) + + return openBenchStore(b, workingPath, clk, true), openChannels + } + + store := openBenchStore(b, workingPath, clk, false) + populateMultiChanData( + b, ctx, store, numPeers, channelsPerPeer, updatesPerChannel, + seed, startTime, endTime, + ) + _, err := store.ExecContext( + ctx, fmt.Sprintf("VACUUM INTO '%s'", fixturePath), + ) + require.NoError(b, err) + + return store, openChannels +} + +func multiChanChannelList(b *testing.B, numPeers, + channelsPerPeer int) []lndclient.ChannelInfo { + + openChannels := make( + []lndclient.ChannelInfo, 0, numPeers*channelsPerPeer, + ) + for i := range numPeers { + pubKey := benchPubKey(i) + vertex, err := route.NewVertexFromStr(pubKey) + require.NoError(b, err) + + for c := range channelsPerPeer { + openChannels = append( + openChannels, lndclient.ChannelInfo{ + ChannelID: uint64( + 100_000 + i*channelsPerPeer + c, + ), + PubKeyBytes: vertex, + }, + ) + } + } + + return openChannels +} + +func populateMultiChanData(b *testing.B, ctx context.Context, store *Store, + numPeers, channelsPerPeer, updatesPerChannel, seed int, startTime, + endTime time.Time) { + + windowNanos := endTime.Sub(startTime).Nanoseconds() + // #nosec G404 – deterministic bench PRNG + rng := rand.New(rand.NewSource(int64(seed))) + + for i := range numPeers { + peerID, err := store.AddPeer(ctx, benchPubKey(i)) + require.NoError(b, err) + + for c := range channelsPerPeer { + chanPoint := fmt.Sprintf("bench_mc_txid:%d:%d", i, c) + scid := uint64(100_000 + i*channelsPerPeer + c) + chanID, err := store.AddChannel( + ctx, chanPoint, scid, peerID, + ) + require.NoError(b, err) + + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: chanID, + EventType: EventTypeUpdate, + Timestamp: testTime, + LocalBalance: fn.Some( + btcutil.Amount(1_000_000), + ), + RemoteBalance: fn.Some( + btcutil.Amount(1_000_000), + ), + }, + ) + require.NoError(b, err) + + timestamps := make([]time.Time, updatesPerChannel) + for j := range timestamps { + offset := rng.Int63n(windowNanos) + timestamps[j] = startTime.Add( + time.Duration(offset), + ) + } + sort.Slice( + timestamps, + func(a, c int) bool { + return timestamps[a].Before( + timestamps[c], + ) + }, + ) + + for _, ts := range timestamps { + evType := []EventType{ + EventTypeOnline, EventTypeOffline, + EventTypeUpdate, + }[rng.Intn(3)] + + ev := &ChannelEvent{ + ChannelID: chanID, + EventType: evType, + Timestamp: ts, + } + if evType == EventTypeUpdate { + ev.LocalBalance = fn.Some( + btcutil.Amount( + rng.Int63n(2_000_000), + ), + ) + ev.RemoteBalance = fn.Some( + btcutil.Amount( + rng.Int63n(2_000_000), + ), + ) + } + require.NoError( + b, store.AddChannelEvent(ctx, ev), + ) + } + } + } +} + +// BenchmarkEffectiveUptime exercises the K² pair walk against a realistic +// dataset whose shape is defined by the constants below. It measures the +// per-pair event-restream cost, making cache-layer amortisation visible as a +// wall-clock delta. +func BenchmarkEffectiveUptime(b *testing.B) { + const ( + numPeers = 1000 + updatesPerChannel = 500 + seed = 42 + ) + + ctx := context.Background() + clk := clock.NewTestClock(testTime) + + startTime := testTime.Add(time.Hour) + endTime := startTime.Add(24 * time.Hour) + + store, openChannels := loadOrPopulateBenchStore( + b, ctx, clk, numPeers, updatesPerChannel, seed, startTime, + endTime, + ) + + stub := &stubLndChannelClient{openChannels: openChannels} + analyzer := NewForwardingAnalyzer( + store, lndclient.LndServices{ + Client: stub, + }, + ) + + for b.Loop() { + _, err := analyzer.EffectiveUptime( + ctx, startTime, endTime, 0, 0, + ) + require.NoError(b, err) + } +} + +// loadOrPopulateBenchStore returns a Store ready for the analyzer benchmark, +// caching the fixture in $TMPDIR to avoid re-running the SQLite population on +// every benchmark invocation. +func loadOrPopulateBenchStore(b *testing.B, ctx context.Context, + clk clock.Clock, numPeers, updatesPerChannel, seed int, startTime, + endTime time.Time) (*Store, []lndclient.ChannelInfo) { + + fixtureName := fmt.Sprintf("faraday-bench-eu-v%d-p%d-u%d-s%d.sqlite", + benchFixtureVersion, numPeers, updatesPerChannel, seed) + fixturePath := filepath.Join(os.TempDir(), fixtureName) + workingPath := filepath.Join(b.TempDir(), "tmp.db") + + openChannels := benchChannelList(b, numPeers) + + if _, err := os.Stat(fixturePath); err == nil { + b.Logf("Reusing bench fixture %s", fixturePath) + require.NoError(b, copyFile(fixturePath, workingPath)) + + return openBenchStore(b, workingPath, clk, true), openChannels + } + + b.Logf("Populating bench fixture (will cache at %s)", fixturePath) + store := openBenchStore(b, workingPath, clk, false) + populateBenchData( + b, ctx, store, numPeers, updatesPerChannel, seed, startTime, + endTime, + ) + + // VACUUM INTO writes a clean, WAL-free snapshot atomically so the + // fixture is self-contained. + _, err := store.ExecContext( + ctx, fmt.Sprintf("VACUUM INTO '%s'", fixturePath), + ) + require.NoError(b, err) + + return store, openChannels +} + +// openBenchStore opens a SqliteStore at the given path and wraps it in a +// chanevents.Store. When reuse is true the migrations are skipped because the +// fixture was migrated when it was originally created. +func openBenchStore(b *testing.B, dbPath string, clk clock.Clock, + reuse bool) *Store { + + sqlDB, err := sqldb.NewSqliteStore( + &sqldb.SqliteConfig{ + SkipMigrations: reuse, + }, + dbPath, + ) + require.NoError(b, err) + + if !reuse { + require.NoError( + b, sqldb.ApplyAllMigrations( + sqlDB, db.FaradayMigrationSets, + ), + ) + } + + b.Cleanup(func() { + require.NoError(b, sqlDB.DB.Close()) + }) + + return createStore(b, sqlDB.BaseDB, clk) +} + +// benchChannelList reconstructs the lnd-stub ChannelInfo set from the bench +// parameters. The pubkey and scid layouts mirror populateBenchData so the +// fixture-reuse path doesn't need to read them back from the store. +func benchChannelList(b *testing.B, numPeers int) []lndclient.ChannelInfo { + openChannels := make([]lndclient.ChannelInfo, numPeers) + for i := range numPeers { + pubKey := benchPubKey(i) + vertex, err := route.NewVertexFromStr(pubKey) + require.NoError(b, err) + + openChannels[i] = lndclient.ChannelInfo{ + ChannelID: uint64(1_000 + i), + PubKeyBytes: vertex, + } + } + + return openChannels +} + +// populateBenchData inserts the bench dataset: one peer per index, one channel +// per peer, a seed Update before startTime so the initial-state walk has a +// baseline, and updatesPerChannel random events per channel inside the window. +// Each peer's inserts run inside a single transaction so the SQLite commit +// fsync amortises across ~updatesPerChannel rows instead of firing per row. +func populateBenchData(b *testing.B, ctx context.Context, store *Store, + numPeers, updatesPerChannel, seed int, startTime, endTime time.Time) { + + windowNanos := endTime.Sub(startTime).Nanoseconds() + // #nosec G404 – deterministic bench PRNG + rng := rand.New(rand.NewSource(int64(seed))) + + eventTypes := [3]EventType{ + EventTypeOnline, EventTypeOffline, EventTypeUpdate, + } + + for i := range numPeers { + // Pre-roll all rng-derived data outside the tx so the body is + // deterministic and the rng state is not perturbed if ExecTx + // retries. + timestamps := make([]time.Time, updatesPerChannel) + for j := range timestamps { + offset := rng.Int63n(windowNanos) + timestamps[j] = startTime.Add(time.Duration(offset)) + } + sort.Slice( + timestamps, + func(a, c int) bool { + return timestamps[a].Before(timestamps[c]) + }, + ) + + eventParams := make( + []sqlc.InsertChannelEventParams, updatesPerChannel, + ) + for j, ts := range timestamps { + evType := eventTypes[rng.Intn(3)] + eventParams[j] = sqlc.InsertChannelEventParams{ + EventType: int16(evType), + Timestamp: ts.UTC(), + } + if evType == EventTypeUpdate { + eventParams[j].LocalBalanceSat = sql.NullInt64{ + Int64: rng.Int63n(2_000_000), + Valid: true, + } + eventParams[j].RemoteBalanceSat = sql.NullInt64{ + Int64: rng.Int63n(2_000_000), + Valid: true, + } + } + } + + pubKey := benchPubKey(i) + chanPoint := fmt.Sprintf("bench_txid:%d", i) + scid := uint64(1_000 + i) + + err := store.db.ExecTx( + ctx, sqldb.WriteTxOpt(), + func(q SQLQueries) error { + peerID, err := q.InsertPeer(ctx, pubKey) + if err != nil { + return err + } + + chanID, err := q.InsertChannel( + ctx, sqlc.InsertChannelParams{ + ChannelPoint: chanPoint, + ShortChannelID: scidToInt64(scid), + PeerID: peerID, + }, + ) + if err != nil { + return err + } + + // Seed Update before startTime so the + // initial-state walk has a baseline. + err = q.InsertChannelEvent( + ctx, sqlc.InsertChannelEventParams{ + ChannelID: chanID, + EventType: int16(EventTypeUpdate), + Timestamp: testTime.UTC(), + LocalBalanceSat: sql.NullInt64{ + Int64: 1_000_000, + Valid: true, + }, + RemoteBalanceSat: sql.NullInt64{ + Int64: 1_000_000, + Valid: true, + }, + }, + ) + if err != nil { + return err + } + + for _, p := range eventParams { + p.ChannelID = chanID + if err := q.InsertChannelEvent( + ctx, p, + ); err != nil { + return err + } + } + + return nil + }, func() {}, + ) + require.NoError(b, err) + } +} + + +// benchPubKey synthesises a 66-char hex pubkey for peer index i. +// NewVertexFromStr validates length and hex only, not curve membership. +func benchPubKey(i int) string { + return fmt.Sprintf("02%064x", i+1) +} + +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + if _, err := io.Copy(out, in); err != nil { + return err + } + + return out.Sync() +}