diff --git a/chanevents/analyzer.go b/chanevents/analyzer.go new file mode 100644 index 0000000..5da6ee3 --- /dev/null +++ b/chanevents/analyzer.go @@ -0,0 +1,878 @@ +package chanevents + +import ( + "context" + "errors" + "fmt" + "iter" + "log/slog" + "math" + "sort" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btclog/v2" + "github.com/lightninglabs/lndclient" +) + +var ( + // errUnexpectedUpdateEvent fires when getInitialChannelState's + // residual-event walk surfaces an Update at a timestamp newer than the + // seed update. + errUnexpectedUpdateEvent = errors.New("unexpected update event in " + + "initial-state walk") + + // errUnknownEventType fires when the event-replay switch sees an + // EventType outside {Offline, Online, Update}. Indicates schema drift + // between the store and the analyzer. + errUnknownEventType = errors.New("unknown channel event type") +) + +// EventsSource abstracts the chanevents store so ForwardingAnalyzer can derive +// uptime metrics without coupling to a specific storage backend. +type EventsSource interface { + // GetLatestChannelUpdateBefore returns the latest channel event before + // the given time, or (nil, nil) if no event predates it. + GetLatestChannelUpdateBefore(ctx context.Context, channelID int64, + before time.Time) (*ChannelEvent, error) + + // GetChannelEvents fetches up to limit events for a channel with id > + // afterID and timestamp in [startTime, endTime), ordered by id ASC. + // A large limit (e.g. math.MaxInt32) retrieves the entire range. + GetChannelEvents(ctx context.Context, channelID, afterID int64, + startTime, endTime time.Time, + limit int32) ([]*ChannelEvent, error) + + // GetChannelByShortChanID resolves an scid to a Channel, returning + // ErrUnknownChannel when no row matches. + GetChannelByShortChanID(ctx context.Context, + shortChannelID uint64) (*Channel, error) + + // ScidToPeerMap returns the historically recorded scid→peer index, + // including closed channels. + ScidToPeerMap(ctx context.Context) (map[uint64]string, error) +} + +// ForwardingAnalyzer computes forwarding velocity and effective uptime for +// every (peerIn, peerOut) pair. +type ForwardingAnalyzer struct { + store EventsSource + lnd lndclient.LndServices +} + +// channelEventSeq is a chronologically ordered stream of channel events +// paired with a propagated error value. +type channelEventSeq = iter.Seq2[*ChannelEvent, error] + +// ForwardingAbility quantifies the historical routing performance of a peer +// pair. Inconsistent flags the pathological case where forwards were observed +// without the pair ever crossing the liquidity threshold; Velocity is zero in +// that case because the rate is undefined over zero qualifying uptime. +type ForwardingAbility struct { + // Velocity is the forwarding velocity in sat/s during effective uptime. + Velocity float64 + + // UptimeFraction is the ratio of effective uptime to the full window + // duration, in [0, 1]. + UptimeFraction float64 + + // Inconsistent is set when forwards landed but effective uptime was + // zero, indicating the input data and the threshold model disagree. + Inconsistent bool +} + +// PeerPair identifies a unidirectional routing edge from PeerIn to PeerOut. +// PeerIn names the source-side peer (the incoming channel's far end in lnd's +// forwarding vocabulary) and PeerOut names the sink-side peer. +type PeerPair struct { + PeerIn string + PeerOut string +} + +// pairInputs encapsulates the routing performance thresholds for a single +// direction. +type pairInputs struct { + threshold btcutil.Amount + totalSuccessfulAmount btcutil.Amount +} + +// channelState is the per-channel snapshot the uptime walk carries forward as +// it consumes events: liveness plus the two balances that determine forwarding +// liquidity. +type channelState struct { + online bool + localBalance btcutil.Amount + remoteBalance btcutil.Amount +} + +// NewForwardingAnalyzer returns a ready-to-use analyzer. +func NewForwardingAnalyzer(store EventsSource, + lnd lndclient.LndServices) *ForwardingAnalyzer { + + return &ForwardingAnalyzer{ + store: store, + lnd: lnd, + } +} + +// EffectiveUptime returns a ForwardingAbility for every (peerIn, peerOut) pair +// over [startTime, endTime). Closed channels are folded into the considered set +// so survivorship bias does not skew the uptime denominator. The liquidity +// floor is the fwdPercentile-th percentile of successful forward amounts (with +// fwdPercentile in [0, 100]), bounded below by threshold. When forwards land +// but the floor is never crossed, the returned ability is flagged Inconsistent. +func (a *ForwardingAnalyzer) EffectiveUptime(ctx context.Context, startTime, + endTime time.Time, fwdPercentile float64, threshold btcutil.Amount) ( + map[PeerPair]ForwardingAbility, error) { + + if fwdPercentile < 0 || fwdPercentile > 100 { + return nil, fmt.Errorf("fwdPercentile %v outside [0, 100]", + fwdPercentile) + } + + log.DebugS( + ctx, "Calculating effective uptime", + slog.Time("startTime", startTime), + slog.Time("endTime", endTime), + slog.Float64("fwdPercentile", fwdPercentile), + slog.Int64("threshold", int64(threshold)), + ) + + scidToPeer, err := a.store.ScidToPeerMap(ctx) + if err != nil { + return nil, err + } + log.DebugS( + ctx, "Found historical channels", + slog.Int("count", len(scidToPeer)), + ) + + successfulForwards, channelPeersConsidered, err := a.getForwardingData( + ctx, startTime, endTime, scidToPeer, + ) + if err != nil { + return nil, err + } + log.DebugS( + ctx, "Found peer pairs with successful forwards", + slog.Int("count", len(successfulForwards)), + ) + + err = a.addActiveChannels(ctx, channelPeersConsidered) + if err != nil { + return nil, err + } + + peerChannels, initialStates, err := a.getPeerChannelData( + ctx, startTime, channelPeersConsidered, + ) + if err != nil { + return nil, err + } + log.DebugS( + ctx, "Identified channels for peers", + slog.Int("count", len(peerChannels)), + ) + + return calculateAllPairsUptime( + ctx, a.store, startTime, endTime, fwdPercentile, threshold, + successfulForwards, initialStates, peerChannels, + ) +} + +// getForwardingData returns successful forwards and channels from lnd's +// forwarding history over [startTime, endTime), indexed by peer pair. Unknown +// channels are skipped. +func (a *ForwardingAnalyzer) getForwardingData(ctx context.Context, startTime, + endTime time.Time, scidToPeer map[uint64]string) ( + map[PeerPair][]btcutil.Amount, map[uint64]string, error) { + + fwds, err := a.lnd.Client.ForwardingHistory( + ctx, lndclient.ForwardingHistoryRequest{ + StartTime: startTime, + EndTime: endTime, + }, + ) + if err != nil { + return nil, nil, err + } + log.DebugS( + ctx, "Found forwarding events", + slog.Int( + "count", len(fwds.Events), + ), + ) + + channelPeersConsidered := make(map[uint64]string) + successfulForwards := make(map[PeerPair][]btcutil.Amount) + for _, fwd := range fwds.Events { + inPeer, ok := scidToPeer[fwd.ChannelIn] + if !ok { + log.WarnS( + ctx, "Could not find peer for incoming channel", + nil, slog.Uint64("channelIn", fwd.ChannelIn), + ) + continue + } + + outPeer, ok := scidToPeer[fwd.ChannelOut] + if !ok { + log.WarnS( + ctx, "Could not find peer for outgoing channel", + nil, slog.Uint64("channelOut", fwd.ChannelOut), + ) + continue + } + + channelPeersConsidered[fwd.ChannelIn] = inPeer + channelPeersConsidered[fwd.ChannelOut] = outPeer + + pair := PeerPair{ + PeerIn: inPeer, + PeerOut: outPeer, + } + + amt := fwd.AmountMsatOut.ToSatoshis() + successfulForwards[pair] = append(successfulForwards[pair], amt) + } + + return successfulForwards, channelPeersConsidered, nil +} + +// addActiveChannels ensures the channel set includes both open and closed +// channels so that channels that closed during the analysis period are not +// silently excluded. +func (a *ForwardingAnalyzer) addActiveChannels(ctx context.Context, + channelPeersConsidered map[uint64]string) error { + + // Currently open channels surface their peer directly. + openChannels, err := a.lnd.Client.ListChannels(ctx, false, false) + if err != nil { + return err + } + + for _, channel := range openChannels { + channelPeersConsidered[channel.ChannelID] = + channel.PubKeyBytes.String() + } + + // Historically closed channels are added so survivorship bias does not + // skew the denominator. + closedChannels, err := a.lnd.Client.ClosedChannels(ctx) + if err != nil { + return err + } + + for _, channel := range closedChannels { + // Channels that did not confirm onchain will not have a + // ChannelID. + if channel.ChannelID == 0 { + continue + } + + channelPeersConsidered[channel.ChannelID] = + channel.PubKeyBytes.String() + } + + return nil +} + +// getPeerChannelData returns channels and their initial state at startTime, +// grouped by peer, including only those present in the store. +func (a *ForwardingAnalyzer) getPeerChannelData(ctx context.Context, + startTime time.Time, channelPeersConsidered map[uint64]string) ( + map[string][]int64, map[string]map[int64]*channelState, error) { + + peerChannels := make(map[string][]int64) + initialStates := make(map[string]map[int64]*channelState) + for scid, peerPubKey := range channelPeersConsidered { + channel, err := a.store.GetChannelByShortChanID(ctx, scid) + if errors.Is(err, ErrUnknownChannel) { + // Channels obtained from lnd but not present in the + // store. This can happen if the channel was very + // recently opened or closed and the store hasn't + // ingested the event yet. + log.DebugS( + ctx, "Skipping channel not in events store", + slog.Uint64("scid", scid), + ) + + continue + } + if err != nil { + return nil, nil, err + } + + state, err := a.getInitialChannelState( + ctx, startTime, channel.ID, + ) + if err != nil { + return nil, nil, err + } + + if _, ok := initialStates[peerPubKey]; !ok { + initialStates[peerPubKey] = make( + map[int64]*channelState, + ) + } + initialStates[peerPubKey][channel.ID] = state + + peerChannels[peerPubKey] = append( + peerChannels[peerPubKey], channel.ID, + ) + } + + return peerChannels, initialStates, nil +} + +// getInitialChannelState reconstructs a channel's state at startTime by seeding +// from the latest pre-window update and replaying any residual same-second +// siblings the SQL keyset may have surfaced. A channel with no prior update is +// treated as offline with zero balance. +func (a *ForwardingAnalyzer) getInitialChannelState(ctx context.Context, + startTime time.Time, channelID int64) (*channelState, error) { + + lastUpdate, err := a.store.GetLatestChannelUpdateBefore( + ctx, channelID, startTime, + ) + if err != nil { + return nil, err + } + + if lastUpdate == nil { + log.TraceS( + ctx, "No update event for channel", + slog.Int64("channelID", channelID), + slog.Time("startTime", startTime), + ) + + return &channelState{online: false}, nil + } + + // An update event always implies the channel is online. + state := &channelState{ + online: true, + } + lastUpdate.LocalBalance.WhenSome( + func(amt btcutil.Amount) { + state.localBalance = amt + }, + ) + lastUpdate.RemoteBalance.WhenSome( + func(amt btcutil.Amount) { + state.remoteBalance = amt + }, + ) + + // Fetch any residual events between the last update and the start time. + // The range is bounded (typically a handful of same-second siblings or + // status events) so materialising in one call is fine. Replay below + // assumes id-ASC matches chronological order, true while writers leave + // Timestamp zero so the store stamps clock.Now(). Overflow at the cap + // signals pathological volume the analyzer cannot safely seed from. + const residualEventLimit = 1024 + + residual, err := a.store.GetChannelEvents( + ctx, channelID, lastUpdate.ID, lastUpdate.Timestamp, startTime, + residualEventLimit, + ) + if err != nil { + return nil, err + } + + if len(residual) == residualEventLimit { + return nil, fmt.Errorf("residual events overflow (>=%d) for "+ + "chanID=%d", residualEventLimit, channelID) + } + + // Replay the residual events to arrive at the channel state on the + // window's open. + for _, event := range residual { + switch event.EventType { + case EventTypeOffline: + state.online = false + + case EventTypeOnline: + state.online = true + + case EventTypeUpdate: + // Defensively check that the seed update is indeed the + // latest before startTime. + if !event.Timestamp.Equal(lastUpdate.Timestamp) { + return nil, fmt.Errorf("%w: chanID=%d ts=%v", + errUnexpectedUpdateEvent, channelID, + event.Timestamp) + } + + default: + return nil, fmt.Errorf("%w: chanID=%d type=%v", + errUnknownEventType, channelID, event.EventType) + } + } + + return state, nil +} + +// calculateAllPairsUptime returns forwarding abilities for every peer pair, +// computing both directions (A→B and B→A) in a single pass. +func calculateAllPairsUptime(ctx context.Context, store EventsSource, startTime, + endTime time.Time, fwdPercentile float64, threshold btcutil.Amount, + successfulForwards map[PeerPair][]btcutil.Amount, + initialStates map[string]map[int64]*channelState, + peerChannels map[string][]int64) ( + map[PeerPair]ForwardingAbility, error) { + + results := make(map[PeerPair]ForwardingAbility) + recordResult := func(peerIn, peerOut string, a ForwardingAbility) { + results[PeerPair{PeerIn: peerIn, PeerOut: peerOut}] = a + } + + // Lazy per-peer event cache: each peer's events are fetched once and + // replayed across every pair walk that consumes them. + peerEvents := make(map[string][]*ChannelEvent, len(initialStates)) + loadPeer := func(peer string) ([]*ChannelEvent, error) { + if cached, ok := peerEvents[peer]; ok { + return cached, nil + } + + events, err := loadPeerEvents( + ctx, store, startTime, endTime, peerChannels[peer], + ) + if err != nil { + return nil, err + } + + peerEvents[peer] = events + + return events, nil + } + + peers := make([]string, 0, len(initialStates)) + for peer := range initialStates { + peers = append(peers, peer) + } + + for i, peerA := range peers { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + statesA := initialStates[peerA] + sliceA, err := loadPeer(peerA) + if err != nil { + return nil, err + } + + for j := i; j < len(peers); j++ { + peerB := peers[j] + statesB := initialStates[peerB] + + inputsAB, err := pairThresholdInputs( + fwdPercentile, threshold, successfulForwards, + peerA, peerB, + ) + if err != nil { + return nil, err + } + + inputsBA, err := pairThresholdInputs( + fwdPercentile, threshold, successfulForwards, + peerB, peerA, + ) + if err != nil { + return nil, err + } + + sliceB := sliceA + if i != j { + sliceB, err = loadPeer(peerB) + if err != nil { + return nil, err + } + } + + abilityAB, abilityBA, err := + calculateBothDirectionsUptime( + ctx, startTime, endTime, + inputsAB, inputsBA, + statesA, statesB, + mergeEventSlices(sliceA, sliceB), + ) + if err != nil { + return nil, err + } + + recordResult(peerA, peerB, *abilityAB) + if i != j { + recordResult(peerB, peerA, *abilityBA) + } + } + } + + return results, nil +} + +// loadPeerEvents fetches every event in [startTime, endTime) on the given +// channels and returns them merged into a single chronologically sorted slice. +// Events sharing a timestamp are ordered by ascending id so the result is +// deterministic. +func loadPeerEvents(ctx context.Context, store EventsSource, startTime, + endTime time.Time, chanIDs []int64) ([]*ChannelEvent, error) { + + var events []*ChannelEvent + for _, chanID := range chanIDs { + chanEvents, err := store.GetChannelEvents( + ctx, chanID, 0, startTime, endTime, math.MaxInt32, + ) + if err != nil { + return nil, err + } + events = append(events, chanEvents...) + } + + sort.SliceStable( + events, + func(i, j int) bool { + if events[i].Timestamp.Equal(events[j].Timestamp) { + return events[i].ID < events[j].ID + } + + return events[i].Timestamp.Before(events[j].Timestamp) + }, + ) + + return events, nil +} + +// pairThresholdInputs resolves the liquidity floor and cumulative forwarded +// amount for one direction of a peer pair, applying the percentile rule when +// historical forwards exist. +func pairThresholdInputs(fwdPercentile float64, threshold btcutil.Amount, + successfulForwards map[PeerPair][]btcutil.Amount, + peerIn, peerOut string) (pairInputs, error) { + + successAmts := successfulForwards[PeerPair{ + PeerIn: peerIn, PeerOut: peerOut, + }] + t, err := determineThreshold(fwdPercentile, threshold, successAmts) + if err != nil { + return pairInputs{}, err + } + + var total btcutil.Amount + for _, amt := range successAmts { + total += amt + } + + return pairInputs{threshold: t, totalSuccessfulAmount: total}, nil +} + +// determineThreshold establishes the required liquidity floor based on the +// user's manual threshold or the calculated percentile of successful forwards. +func determineThreshold(forwardPercentile float64, + thresholdAmount btcutil.Amount, + successAmts []btcutil.Amount) (btcutil.Amount, error) { + + if len(successAmts) == 0 { + return thresholdAmount, nil + } + + q := forwardPercentile / 100 + p, err := Quantile(successAmts, q) + if err != nil { + return 0, err + } + + return max(btcutil.Amount(math.RoundToEven(p)), thresholdAmount), nil +} + +// calculateBothDirectionsUptime computes the effective forwarding uptime for +// both directions of a peer pair in a single chronological walk of the merged +// event stream. Only the liquidity-direction roles and the per-direction +// thresholds differ between the two accumulators. For self-pair calls +// (statesA == statesB, inputsAB == inputsBA) both returned abilities are +// equal. +func calculateBothDirectionsUptime(ctx context.Context, startTime, + endTime time.Time, inputsAB, inputsBA pairInputs, statesA, + statesB map[int64]*channelState, mergedEvents channelEventSeq) ( + *ForwardingAbility, *ForwardingAbility, error) { + + traceOn := log.Level() <= btclog.LevelTrace + + if traceOn { + log.TraceS(ctx, "Calculating bidirectional effective uptime") + for chanID, state := range statesA { + log.TraceS( + ctx, "Initial state A", + slog.Int64("chanID", chanID), + slog.Bool("online", state.online), + slog.Int64( + "localBalance", int64( + state.localBalance, + ), + ), + slog.Int64( + "remoteBalance", int64( + state.remoteBalance, + ), + ), + ) + } + for chanID, state := range statesB { + log.TraceS( + ctx, "Initial state B", + slog.Int64("chanID", chanID), + slog.Bool("online", state.online), + slog.Int64( + "localBalance", int64( + state.localBalance, + ), + ), + slog.Int64( + "remoteBalance", int64( + state.remoteBalance, + ), + ), + ) + } + log.TraceS( + ctx, "Using final forwarding liquidity thresholds", + slog.Int64( + "thresholdAB", int64(inputsAB.threshold), + ), + slog.Int64( + "thresholdBA", int64(inputsBA.threshold), + ), + ) + } + + statesA = copyChannelStates(statesA) + statesB = copyChannelStates(statesB) + + // Seed running balance sums once. The per-event walk maintains them + // incrementally so the per-tick liquidity check is O(1) instead of + // O(channels-per-peer). The forwarding liquidity (A→B) is the + // bottleneck min(sum of A's online channels' remoteBalance, sum of B's + // online channels' localBalance). The running sums shadow those two + // quantities and the min is computed inline in accumulate. (B→A) is the + // same formula with the roles flipped. + var sumARemote, sumALocal, sumBRemote, sumBLocal btcutil.Amount + for _, s := range statesA { + if s.online { + sumARemote += s.remoteBalance + sumALocal += s.localBalance + } + } + for _, s := range statesB { + if s.online { + sumBRemote += s.remoteBalance + sumBLocal += s.localBalance + } + } + + var uptimeAB, uptimeBA time.Duration + lastTimestamp := startTime + + accumulate := func(intervalDuration time.Duration) { + if intervalDuration <= 0 { + return + } + // (A→B): A is incoming, B is outgoing. Liquidity bottleneck is + // min(A's online inbound, B's online outbound). + liqAB := min(sumARemote, sumBLocal) + // (B→A): roles flipped. + liqBA := min(sumBRemote, sumALocal) + if traceOn { + log.TraceS( + ctx, "Forwarding liquidity check", + slog.Duration("interval", intervalDuration), + slog.Int64( + "liqAB", int64(liqAB), + ), + slog.Int64( + "liqBA", int64(liqBA), + ), + ) + } + if liqAB > inputsAB.threshold { + uptimeAB += intervalDuration + } + if liqBA > inputsBA.threshold { + uptimeBA += intervalDuration + } + } + + for event, err := range mergedEvents { + if err != nil { + return nil, nil, err + } + if traceOn { + log.TraceS( + ctx, "Processing event", + slog.Int64("chanID", event.ChannelID), + btclog.Fmt("type", "%v", event.EventType), + slog.Time("time", event.Timestamp), + ) + } + + accumulate(event.Timestamp.Sub(lastTimestamp)) + + if state, ok := statesA[event.ChannelID]; ok { + if state.online { + sumARemote -= state.remoteBalance + sumALocal -= state.localBalance + } + if err := applyEvent(state, event); err != nil { + return nil, nil, err + } + if state.online { + sumARemote += state.remoteBalance + sumALocal += state.localBalance + } + } + if state, ok := statesB[event.ChannelID]; ok { + if state.online { + sumBRemote -= state.remoteBalance + sumBLocal -= state.localBalance + } + if err := applyEvent(state, event); err != nil { + return nil, nil, err + } + if state.online { + sumBRemote += state.remoteBalance + sumBLocal += state.localBalance + } + } + + lastTimestamp = event.Timestamp + } + + accumulate(endTime.Sub(lastTimestamp)) + + if traceOn { + log.TraceS( + ctx, "Total effective uptime", + slog.Duration("uptimeAB", uptimeAB), + slog.Duration("uptimeBA", uptimeBA), + slog.Duration( + "totalDuration", endTime.Sub(startTime), + ), + ) + } + + abilityAB := makeAbility( + startTime, endTime, uptimeAB, inputsAB.totalSuccessfulAmount, + ) + abilityBA := makeAbility( + startTime, endTime, uptimeBA, inputsBA.totalSuccessfulAmount, + ) + + return abilityAB, abilityBA, nil +} + +// mergeEventSlices interleaves two sorted event streams into a single +// chronological iter.Seq2. Equal-timestamp events from sliceA are yielded +// first. Self-pair calls (sliceA == sliceB) yield each event twice. Callers +// must keep their state updates idempotent under same-timestamp duplicates. +func mergeEventSlices(sliceA, sliceB []*ChannelEvent) channelEventSeq { + return func(yield func(*ChannelEvent, error) bool) { + i, j := 0, 0 + + // Interleave both slices until one is exhausted, ensuring + // strict chronological order across the combined stream. + for i < len(sliceA) && j < len(sliceB) { + if sliceA[i].Timestamp.After(sliceB[j].Timestamp) { + if !yield(sliceB[j], nil) { + return + } + j++ + } else { + if !yield(sliceA[i], nil) { + return + } + i++ + } + } + + // Drain any remaining events from sliceA. This loop only + // executes if sliceB was exhausted first. + for ; i < len(sliceA); i++ { + if !yield(sliceA[i], nil) { + return + } + } + + // Drain any remaining events from sliceB. This loop only + // executes if sliceA was exhausted first. + for ; j < len(sliceB); j++ { + if !yield(sliceB[j], nil) { + return + } + } + } +} + +// copyChannelStates returns a deep copy of the per-channel state map so the +// bidirectional walk cannot mutate the caller's snapshot. +func copyChannelStates(states map[int64]*channelState) map[int64]*channelState { + statesCopy := make(map[int64]*channelState, len(states)) + for chanID, state := range states { + statesCopy[chanID] = &channelState{ + online: state.online, + localBalance: state.localBalance, + remoteBalance: state.remoteBalance, + } + } + + return statesCopy +} + +// applyEvent advances a channel's snapshot by one event. Update events imply +// online and overwrite whichever balance the event carries. Unknown event +// types return errUnknownEventType to surface store↔analyzer schema drift. +func applyEvent(state *channelState, event *ChannelEvent) error { + switch event.EventType { + case EventTypeOffline: + state.online = false + + case EventTypeOnline: + state.online = true + + case EventTypeUpdate: + state.online = true + event.LocalBalance.WhenSome( + func(amt btcutil.Amount) { + state.localBalance = amt + }, + ) + event.RemoteBalance.WhenSome( + func(amt btcutil.Amount) { + state.remoteBalance = amt + }, + ) + + default: + return fmt.Errorf("%w: chanID=%d type=%v", errUnknownEventType, + event.ChannelID, event.EventType) + } + + return nil +} + +// makeAbility folds an accumulated uptime and successful-amount total into a +// ForwardingAbility. When uptime is zero and forwards landed, the result is +// flagged Inconsistent with zero Velocity. +func makeAbility(startTime, endTime time.Time, totalUptime time.Duration, + totalAmt btcutil.Amount) *ForwardingAbility { + + if totalUptime == 0 { + return &ForwardingAbility{Inconsistent: totalAmt > 0} + } + + totalDuration := endTime.Sub(startTime) + + return &ForwardingAbility{ + Velocity: float64(totalAmt) / totalUptime.Seconds(), + UptimeFraction: float64(totalUptime) / float64(totalDuration), + } +} diff --git a/chanevents/analyzer_bench_test.go b/chanevents/analyzer_bench_test.go new file mode 100644 index 0000000..d940d01 --- /dev/null +++ b/chanevents/analyzer_bench_test.go @@ -0,0 +1,468 @@ +package chanevents + +import ( + "context" + "database/sql" + "fmt" + "io" + "math/rand" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/faraday/db" + "github.com/lightninglabs/faraday/db/sqlc" + "github.com/lightninglabs/lndclient" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/sqldb/v2" + "github.com/stretchr/testify/require" +) + +// benchFixtureVersion bumps whenever the populator changes shape so stale +// fixtures from older runs don't poison new benchmark numbers. +const benchFixtureVersion = 2 + +// BenchmarkEffectiveUptimeMultiChan exercises EffectiveUptime against a +// multi-channel dataset (numPeers peers, channelsPerPeer channels each) to +// show the analyzer's K² walk cost under a realistic channel-fan-out shape. +func BenchmarkEffectiveUptimeMultiChan(b *testing.B) { + const ( + numPeers = 30 + channelsPerPeer = 5 + updatesPerChannel = 130 + seed = 4242 + ) + + ctx := context.Background() + clk := clock.NewTestClock(testTime) + + startTime := testTime.Add(time.Hour) + endTime := startTime.Add(24 * time.Hour) + + store, openChannels := loadOrPopulateMultiChanStore( + b, ctx, clk, numPeers, channelsPerPeer, updatesPerChannel, seed, + startTime, endTime, + ) + + stub := &stubLndChannelClient{openChannels: openChannels} + analyzer := NewForwardingAnalyzer( + store, lndclient.LndServices{ + Client: stub, + }, + ) + + // b.Loop drives one benchmark iteration per call (Go 1.24+ idiom). + for b.Loop() { + _, err := analyzer.EffectiveUptime( + ctx, startTime, endTime, 0, 0, + ) + require.NoError(b, err) + } +} + +func loadOrPopulateMultiChanStore(b *testing.B, ctx context.Context, + clk clock.Clock, numPeers, channelsPerPeer, updatesPerChannel, seed int, + startTime, endTime time.Time) (*Store, []lndclient.ChannelInfo) { + + fixtureName := fmt.Sprintf("faraday-bench-eu-mc-v%d-p%d-c%d-u%d-s%d.s"+ + "qlite", benchFixtureVersion, numPeers, channelsPerPeer, + updatesPerChannel, seed) + fixturePath := filepath.Join(os.TempDir(), fixtureName) + workingPath := filepath.Join(b.TempDir(), "tmp.db") + + openChannels := multiChanChannelList(b, numPeers, channelsPerPeer) + + if _, err := os.Stat(fixturePath); err == nil { + require.NoError(b, copyFile(fixturePath, workingPath)) + + return openBenchStore(b, workingPath, clk, true), openChannels + } + + store := openBenchStore(b, workingPath, clk, false) + populateMultiChanData( + b, ctx, store, numPeers, channelsPerPeer, updatesPerChannel, + seed, startTime, endTime, + ) + _, err := store.ExecContext( + ctx, fmt.Sprintf("VACUUM INTO '%s'", fixturePath), + ) + require.NoError(b, err) + + return store, openChannels +} + +func multiChanChannelList(b *testing.B, numPeers, + channelsPerPeer int) []lndclient.ChannelInfo { + + openChannels := make( + []lndclient.ChannelInfo, 0, numPeers*channelsPerPeer, + ) + for i := range numPeers { + pubKey := benchPubKey(i) + vertex, err := route.NewVertexFromStr(pubKey) + require.NoError(b, err) + + for c := range channelsPerPeer { + openChannels = append( + openChannels, lndclient.ChannelInfo{ + ChannelID: uint64( + 100_000 + i*channelsPerPeer + c, + ), + PubKeyBytes: vertex, + }, + ) + } + } + + return openChannels +} + +func populateMultiChanData(b *testing.B, ctx context.Context, store *Store, + numPeers, channelsPerPeer, updatesPerChannel, seed int, startTime, + endTime time.Time) { + + windowNanos := endTime.Sub(startTime).Nanoseconds() + // #nosec G404 – deterministic bench PRNG + rng := rand.New(rand.NewSource(int64(seed))) + + for i := range numPeers { + peerID, err := store.AddPeer(ctx, benchPubKey(i)) + require.NoError(b, err) + + for c := range channelsPerPeer { + chanPoint := fmt.Sprintf("bench_mc_txid:%d:%d", i, c) + scid := uint64(100_000 + i*channelsPerPeer + c) + chanID, err := store.AddChannel( + ctx, chanPoint, scid, peerID, + ) + require.NoError(b, err) + + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: chanID, + EventType: EventTypeUpdate, + Timestamp: testTime, + LocalBalance: fn.Some( + btcutil.Amount(1_000_000), + ), + RemoteBalance: fn.Some( + btcutil.Amount(1_000_000), + ), + }, + ) + require.NoError(b, err) + + timestamps := make([]time.Time, updatesPerChannel) + for j := range timestamps { + offset := rng.Int63n(windowNanos) + timestamps[j] = startTime.Add( + time.Duration(offset), + ) + } + sort.Slice( + timestamps, + func(a, c int) bool { + return timestamps[a].Before( + timestamps[c], + ) + }, + ) + + for _, ts := range timestamps { + evType := []EventType{ + EventTypeOnline, EventTypeOffline, + EventTypeUpdate, + }[rng.Intn(3)] + + ev := &ChannelEvent{ + ChannelID: chanID, + EventType: evType, + Timestamp: ts, + } + if evType == EventTypeUpdate { + ev.LocalBalance = fn.Some( + btcutil.Amount( + rng.Int63n(2_000_000), + ), + ) + ev.RemoteBalance = fn.Some( + btcutil.Amount( + rng.Int63n(2_000_000), + ), + ) + } + require.NoError( + b, store.AddChannelEvent(ctx, ev), + ) + } + } + } +} + +// BenchmarkEffectiveUptime exercises the K² pair walk against a realistic +// dataset whose shape is defined by the constants below. It measures the +// per-pair event-restream cost, making cache-layer amortisation visible as a +// wall-clock delta. +func BenchmarkEffectiveUptime(b *testing.B) { + const ( + numPeers = 1000 + updatesPerChannel = 500 + seed = 42 + ) + + ctx := context.Background() + clk := clock.NewTestClock(testTime) + + startTime := testTime.Add(time.Hour) + endTime := startTime.Add(24 * time.Hour) + + store, openChannels := loadOrPopulateBenchStore( + b, ctx, clk, numPeers, updatesPerChannel, seed, startTime, + endTime, + ) + + stub := &stubLndChannelClient{openChannels: openChannels} + analyzer := NewForwardingAnalyzer( + store, lndclient.LndServices{ + Client: stub, + }, + ) + + for b.Loop() { + _, err := analyzer.EffectiveUptime( + ctx, startTime, endTime, 0, 0, + ) + require.NoError(b, err) + } +} + +// loadOrPopulateBenchStore returns a Store ready for the analyzer benchmark, +// caching the fixture in $TMPDIR to avoid re-running the SQLite population on +// every benchmark invocation. +func loadOrPopulateBenchStore(b *testing.B, ctx context.Context, + clk clock.Clock, numPeers, updatesPerChannel, seed int, startTime, + endTime time.Time) (*Store, []lndclient.ChannelInfo) { + + fixtureName := fmt.Sprintf("faraday-bench-eu-v%d-p%d-u%d-s%d.sqlite", + benchFixtureVersion, numPeers, updatesPerChannel, seed) + fixturePath := filepath.Join(os.TempDir(), fixtureName) + workingPath := filepath.Join(b.TempDir(), "tmp.db") + + openChannels := benchChannelList(b, numPeers) + + if _, err := os.Stat(fixturePath); err == nil { + b.Logf("Reusing bench fixture %s", fixturePath) + require.NoError(b, copyFile(fixturePath, workingPath)) + + return openBenchStore(b, workingPath, clk, true), openChannels + } + + b.Logf("Populating bench fixture (will cache at %s)", fixturePath) + store := openBenchStore(b, workingPath, clk, false) + populateBenchData( + b, ctx, store, numPeers, updatesPerChannel, seed, startTime, + endTime, + ) + + // VACUUM INTO writes a clean, WAL-free snapshot atomically so the + // fixture is self-contained. + _, err := store.ExecContext( + ctx, fmt.Sprintf("VACUUM INTO '%s'", fixturePath), + ) + require.NoError(b, err) + + return store, openChannels +} + +// openBenchStore opens a SqliteStore at the given path and wraps it in a +// chanevents.Store. When reuse is true the migrations are skipped because the +// fixture was migrated when it was originally created. +func openBenchStore(b *testing.B, dbPath string, clk clock.Clock, + reuse bool) *Store { + + sqlDB, err := sqldb.NewSqliteStore( + &sqldb.SqliteConfig{ + SkipMigrations: reuse, + }, + dbPath, + ) + require.NoError(b, err) + + if !reuse { + require.NoError( + b, sqldb.ApplyAllMigrations( + sqlDB, db.FaradayMigrationSets, + ), + ) + } + + b.Cleanup(func() { + require.NoError(b, sqlDB.DB.Close()) + }) + + return createStore(b, sqlDB.BaseDB, clk) +} + +// benchChannelList reconstructs the lnd-stub ChannelInfo set from the bench +// parameters. The pubkey and scid layouts mirror populateBenchData so the +// fixture-reuse path doesn't need to read them back from the store. +func benchChannelList(b *testing.B, numPeers int) []lndclient.ChannelInfo { + openChannels := make([]lndclient.ChannelInfo, numPeers) + for i := range numPeers { + pubKey := benchPubKey(i) + vertex, err := route.NewVertexFromStr(pubKey) + require.NoError(b, err) + + openChannels[i] = lndclient.ChannelInfo{ + ChannelID: uint64(1_000 + i), + PubKeyBytes: vertex, + } + } + + return openChannels +} + +// populateBenchData inserts the bench dataset: one peer per index, one channel +// per peer, a seed Update before startTime so the initial-state walk has a +// baseline, and updatesPerChannel random events per channel inside the window. +// Each peer's inserts run inside a single transaction so the SQLite commit +// fsync amortises across ~updatesPerChannel rows instead of firing per row. +func populateBenchData(b *testing.B, ctx context.Context, store *Store, + numPeers, updatesPerChannel, seed int, startTime, endTime time.Time) { + + windowNanos := endTime.Sub(startTime).Nanoseconds() + // #nosec G404 – deterministic bench PRNG + rng := rand.New(rand.NewSource(int64(seed))) + + eventTypes := [3]EventType{ + EventTypeOnline, EventTypeOffline, EventTypeUpdate, + } + + for i := range numPeers { + // Pre-roll all rng-derived data outside the tx so the body is + // deterministic and the rng state is not perturbed if ExecTx + // retries. + timestamps := make([]time.Time, updatesPerChannel) + for j := range timestamps { + offset := rng.Int63n(windowNanos) + timestamps[j] = startTime.Add(time.Duration(offset)) + } + sort.Slice( + timestamps, + func(a, c int) bool { + return timestamps[a].Before(timestamps[c]) + }, + ) + + eventParams := make( + []sqlc.InsertChannelEventParams, updatesPerChannel, + ) + for j, ts := range timestamps { + evType := eventTypes[rng.Intn(3)] + eventParams[j] = sqlc.InsertChannelEventParams{ + EventType: int16(evType), + Timestamp: ts.UTC(), + } + if evType == EventTypeUpdate { + eventParams[j].LocalBalanceSat = sql.NullInt64{ + Int64: rng.Int63n(2_000_000), + Valid: true, + } + eventParams[j].RemoteBalanceSat = sql.NullInt64{ + Int64: rng.Int63n(2_000_000), + Valid: true, + } + } + } + + pubKey := benchPubKey(i) + chanPoint := fmt.Sprintf("bench_txid:%d", i) + scid := uint64(1_000 + i) + + err := store.db.ExecTx( + ctx, sqldb.WriteTxOpt(), + func(q SQLQueries) error { + peerID, err := q.InsertPeer(ctx, pubKey) + if err != nil { + return err + } + + chanID, err := q.InsertChannel( + ctx, sqlc.InsertChannelParams{ + ChannelPoint: chanPoint, + ShortChannelID: scidToInt64(scid), + PeerID: peerID, + }, + ) + if err != nil { + return err + } + + // Seed Update before startTime so the + // initial-state walk has a baseline. + err = q.InsertChannelEvent( + ctx, sqlc.InsertChannelEventParams{ + ChannelID: chanID, + EventType: int16(EventTypeUpdate), + Timestamp: testTime.UTC(), + LocalBalanceSat: sql.NullInt64{ + Int64: 1_000_000, + Valid: true, + }, + RemoteBalanceSat: sql.NullInt64{ + Int64: 1_000_000, + Valid: true, + }, + }, + ) + if err != nil { + return err + } + + for _, p := range eventParams { + p.ChannelID = chanID + if err := q.InsertChannelEvent( + ctx, p, + ); err != nil { + return err + } + } + + return nil + }, func() {}, + ) + require.NoError(b, err) + } +} + + +// benchPubKey synthesises a 66-char hex pubkey for peer index i. +// NewVertexFromStr validates length and hex only, not curve membership. +func benchPubKey(i int) string { + return fmt.Sprintf("02%064x", i+1) +} + +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + if _, err := io.Copy(out, in); err != nil { + return err + } + + return out.Sync() +} diff --git a/chanevents/analyzer_test.go b/chanevents/analyzer_test.go new file mode 100644 index 0000000..a8d8456 --- /dev/null +++ b/chanevents/analyzer_test.go @@ -0,0 +1,904 @@ +package chanevents + +import ( + "context" + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/lndclient" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" +) + +// nextEventID is incremented by newEvent so every synthetic event carries a +// unique ID, making assertion failures easier to trace. +var nextEventID int64 = 1 + +// newEvent returns an update-typed ChannelEvent with the given balances. The +// ID is auto-incremented so failing assertions can identify the offending row. +func newEvent(chanID int64, ts int64, eventType EventType, local, + remote btcutil.Amount) *ChannelEvent { + + id := nextEventID + nextEventID++ + + return &ChannelEvent{ + ID: id, + ChannelID: chanID, + Timestamp: time.Unix(ts, 0), + EventType: eventType, + LocalBalance: fn.Some(local), + RemoteBalance: fn.Some(remote), + } +} + +// newStatusEvent returns an Online or Offline event with no balance payload, +// matching the schema contract for non-Update event types. +func newStatusEvent(chanID int64, ts int64, eventType EventType) *ChannelEvent { + return &ChannelEvent{ + ChannelID: chanID, + Timestamp: time.Unix(ts, 0), + EventType: eventType, + LocalBalance: fn.None[btcutil.Amount](), + RemoteBalance: fn.None[btcutil.Amount](), + } +} + +// TestMergeEventSlices verifies that interleaving distinct or identical streams +// preserves strict chronological order and resolves timestamp collisions +// deterministically. +func TestMergeEventSlices(t *testing.T) { + t.Parallel() + + const ( + fromA int64 = 1 + fromB int64 = 2 + ) + + // selfPair is the same backing slice passed as both sliceA and sliceB + // in the self-pair row. The merge must yield each element twice. + selfPair := []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + } + + testCases := []struct { + name string + sliceA []*ChannelEvent + sliceB []*ChannelEvent + expected []*ChannelEvent + }{ + { + name: "Both empty", + }, + { + name: "Only A", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + }, + }, + { + name: "Only B", + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + }, + { + name: "Disjoint A before B", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 150, EventTypeOffline), + }, + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromB, 250, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 150, EventTypeOffline), + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromB, 250, EventTypeOffline), + }, + }, + { + name: "Interleaved", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 300, EventTypeOffline), + }, + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromB, 400, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOnline), + newStatusEvent(fromA, 300, EventTypeOffline), + newStatusEvent(fromB, 400, EventTypeOffline), + }, + }, + { + name: "Equal timestamps yield A first", + sliceA: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + }, + sliceB: []*ChannelEvent{ + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + expected: []*ChannelEvent{ + newStatusEvent(fromA, 100, EventTypeOnline), + newStatusEvent(fromB, 100, EventTypeOnline), + newStatusEvent(fromA, 200, EventTypeOffline), + newStatusEvent(fromB, 200, EventTypeOffline), + }, + }, + { + name: "Self-pair duplicates each event", + sliceA: selfPair, + sliceB: selfPair, + expected: []*ChannelEvent{ + selfPair[0], selfPair[0], + selfPair[1], selfPair[1], + }, + }, + } + + for _, tc := range testCases { + t.Run( + tc.name, + func(t *testing.T) { + t.Parallel() + + var got []*ChannelEvent + for event, err := range mergeEventSlices( + tc.sliceA, tc.sliceB, + ) { + require.NoError(t, err) + got = append(got, event) + } + require.Equal(t, tc.expected, got) + }, + ) + } +} + +// TestMergeEventSlicesEarlyTermination verifies that the merge sequence safely +// halts mid-stream without exhausting inputs when the consumer aborts. +func TestMergeEventSlicesEarlyTermination(t *testing.T) { + t.Parallel() + + sliceA := []*ChannelEvent{ + newStatusEvent(1, 100, EventTypeOnline), + newStatusEvent(1, 300, EventTypeOffline), + } + sliceB := []*ChannelEvent{ + newStatusEvent(2, 200, EventTypeOnline), + newStatusEvent(2, 400, EventTypeOffline), + } + + var got []*ChannelEvent + for event, err := range mergeEventSlices(sliceA, sliceB) { + require.NoError(t, err) + got = append(got, event) + if len(got) == 2 { + break + } + } + require.Len(t, got, 2) +} + +// TestCalculateBothDirectionsUptime verifies that the bidirectional uptime +// walk correctly attributes effective uptime given varying liveness, balance +// changes, and forwarding amounts. Each table row pins one boundary condition +// of the (A→B) direction. Full bidirectional invariants are covered by +// dedicated tests. +func TestCalculateBothDirectionsUptime(t *testing.T) { + t.Parallel() + + var ( + chanInID int64 = 1 + chanOutID int64 = 2 + startTime = time.Unix(100, 0) + endTime = time.Unix(200, 0) + ) + + testCases := []struct { + name string + + inStates map[int64]*channelState + outStates map[int64]*channelState + inEvents []*ChannelEvent + outEvents []*ChannelEvent + + successAmts []btcutil.Amount + thresholdAmount btcutil.Amount + forwardPercentile float64 + + expected *ForwardingAbility + expectedErr string + }{ + { + name: "Basic case always online", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + successAmts: []btcutil.Amount{ + 100, + }, + expected: &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1.0, + }, + }, + { + // The forward in successAmts updates both channels: + // the in-channel's remoteBalance drops by the amount + // (peer A spent it) and the out-channel's localBalance + // drops by the same (forwarded out to B). Both Update + // events fire at the forward's timestamp. The threshold + // straddles pre and post liquidity so the boundary at + // the forward time is what carves the uptime window. + // The later tests in here aren't causally consistent + // with the successAmts, which in general is the source + // of truth for forwards. + name: "Forward drives the balance timeline", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1500, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 1000, + }, + }, + inEvents: []*ChannelEvent{ + // Forward of 200 sats at t=150 on the in side: + // local 0 → 200, remote 1500 → 1300. + newEvent( + chanInID, 150, EventTypeUpdate, 200, + 1300, + ), + }, + outEvents: []*ChannelEvent{ + // Same forward on the out side: local 1000 → + // 800, remote 0 → 200. + newEvent( + chanOutID, 150, EventTypeUpdate, 800, + 200, + ), + }, + successAmts: []btcutil.Amount{ + 200, + }, + thresholdAmount: 900, + // t=100..150 (50s): liq = min(1500, 1000) = 1000 + // > 900 → qualifies. + // t=150..200 (50s): liq = min(1300, 800) = 800 + // < 900 → drops out. + // Total uptime = 50s, total amount = 200 sats. + expected: &ForwardingAbility{ + Velocity: 4, // 200 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Channel goes offline", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + inEvents: []*ChannelEvent{ + newStatusEvent(chanInID, 150, EventTypeOffline), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Balance change", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 150, EventTypeUpdate, 1200, + 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + // Balance changes at t=150, so for the first 50s the + // liquidity is 800, then it's 1000 for the next 50s. + // The total effective uptime is 100s, because the + // liquidity threshold is low. + expected: &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1, + }, + }, + { + name: "Duplicate event timestamps", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + }, + inEvents: []*ChannelEvent{ + newStatusEvent(chanInID, 150, EventTypeOffline), + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 150, EventTypeUpdate, 1200, + 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + // At t=150, two events happen. From t=100 to t=150 + // (50s), liquidity is min(1000, 800) = 800. After + // t=150, chanIn is offline, so liquidity is 0 for the + // remaining 50s. + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "No initial state", + inStates: map[int64]*channelState{ + chanInID: { + online: false, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: false, + }, + }, + inEvents: []*ChannelEvent{ + newEvent( + chanInID, 120, EventTypeUpdate, 0, 1000, + ), + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 140, EventTypeUpdate, 800, 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + // We don't have initial balance states, so we can't + // determine liquidity until we see an event on both + // channels. At t=140 we know the liquidity is 800, and + // it's online for the remaining 60s of the 100s total. + // So uptime fraction is 0.6 for 800. + expected: &ForwardingAbility{ + // 100 sats / 60s + Velocity: 1.6666666666666667, + UptimeFraction: 0.6, + }, + }, + { + name: "Multiple channels for out peer", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 800, + }, + 3: { + online: true, + localBalance: 500, + }, + }, + outEvents: []*ChannelEvent{ + newEvent( + chanOutID, 150, EventTypeUpdate, 1200, + 0, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 900, + // We expect the liquidity to be the sum of the + // available balances of the out channels. t=100-150: + // min(1000, 800 + 500) = 1000 t=150-200: min(1000, 1200 + // + 500) = 1000 + expected: &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1.0, + }, + }, + { + name: "Circular payment ability", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + localBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanInID: { + online: true, + localBalance: 1000, + }, + }, + inEvents: []*ChannelEvent{ + newEvent( + chanInID, 150, EventTypeUpdate, 500, + 500, + ), + }, + outEvents: []*ChannelEvent{ + newEvent( + chanInID, 150, EventTypeUpdate, 500, + 500, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1, + // For the first 50s, liquidity is min(1000, 0) = 0. For + // the next 50s, liquidity is min(500, 500) = 500. + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Self route multiple channels", + inStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + chanOutID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + }, + outStates: map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + chanOutID: { + online: true, + remoteBalance: 1000, + localBalance: 1000, + }, + }, + inEvents: []*ChannelEvent{}, + outEvents: []*ChannelEvent{ + // At 150s (midpoint), out channel balance drops + // to 0. + newEvent( + chanOutID, 150, EventTypeUpdate, 0, + 2000, + ), + }, + successAmts: []btcutil.Amount{ + 100, + }, + thresholdAmount: 1500, + // Initial fwdLiquidity = min(2000, 2000) = 2000. 2000 > + // 1500, so first 50s accrue. At t=150, chanOut local + // drops to 0. outStates total local becomes 1000 (from + // chanIn). fwdLiquidity = min(2000, 1000) = 1000. 1000 + // is not > 1500, so last 50s do not accrue. + expected: &ForwardingAbility{ + Velocity: 2, // 100 sats / 50s + UptimeFraction: 0.5, + }, + }, + { + name: "Zero uptime no forwards yields zero velocity", + inStates: map[int64]*channelState{ + chanInID: { + online: false, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: false, + }, + }, + expected: &ForwardingAbility{ + Velocity: 0, + UptimeFraction: 0, + }, + }, + { + name: "Zero uptime with forwards is flagged inconsistent", + inStates: map[int64]*channelState{ + chanInID: { + online: false, + }, + }, + outStates: map[int64]*channelState{ + chanOutID: { + online: false, + }, + }, + successAmts: []btcutil.Amount{ + 100, + }, + expected: &ForwardingAbility{ + Velocity: 0, + UptimeFraction: 0, + Inconsistent: true, + }, + }, + } + + for _, tc := range testCases { + t.Run( + tc.name, + func(t *testing.T) { + t.Parallel() + + var totalSuccessfulAmount btcutil.Amount + for _, amt := range tc.successAmts { + totalSuccessfulAmount += amt + } + + mergedEvents := mergeEventSlices( + tc.inEvents, tc.outEvents, + ) + + inputsAB := pairInputs{ + threshold: tc.thresholdAmount, + totalSuccessfulAmount: totalSuccessfulAmount, + } + + // The (B→A) inputs are not asserted by this + // test. Pass zero so the second ability is + // well-defined but ignored. + var inputsBA pairInputs + + abilityAB, _, err := + calculateBothDirectionsUptime( + context.Background(), + startTime, endTime, + inputsAB, inputsBA, + tc.inStates, tc.outStates, + mergedEvents, + ) + require.NoError(t, err) + require.Equal(t, tc.expected, abilityAB) + }, + ) + } +} + +// TestCalculateBothDirectionsUptimeAsymmetric pins that the two accumulators +// are independent: with liquidity that crosses only the A→B threshold, the B→A +// direction must report zero uptime regardless of how high the A→B side scores. +func TestCalculateBothDirectionsUptimeAsymmetric(t *testing.T) { + t.Parallel() + + var ( + chanInID int64 = 1 + chanOutID int64 = 2 + startTime = time.Unix(100, 0) + endTime = time.Unix(200, 0) + ) + + // A holds inbound liquidity (remoteBalance high). B holds outbound + // liquidity (localBalance high). The situation favours A→B and starves + // B→A. + statesA := map[int64]*channelState{ + chanInID: { + online: true, + remoteBalance: 1000, + localBalance: 100, + }, + } + statesB := map[int64]*channelState{ + chanOutID: { + online: true, + localBalance: 1000, + remoteBalance: 100, + }, + } + + // Threshold sits between the two directions: A→B has min(1000, 1000) + // = 1000 ≥ 500 (qualifying); B→A has min(100, 100) = 100 < 500 (not + // qualifying). + inputsAB := pairInputs{ + threshold: 500, + totalSuccessfulAmount: 100, + } + inputsBA := pairInputs{ + threshold: 500, + totalSuccessfulAmount: 50, + } + + abilityAB, abilityBA, err := calculateBothDirectionsUptime( + context.Background(), startTime, endTime, + inputsAB, inputsBA, statesA, statesB, + mergeEventSlices(nil, nil), + ) + require.NoError(t, err) + + require.Equal( + t, &ForwardingAbility{ + Velocity: 1, // 100 sats / 100s + UptimeFraction: 1.0, + }, abilityAB, + ) + require.Equal( + t, &ForwardingAbility{ + Velocity: 0, + UptimeFraction: 0, + // Forwards landed but BA never crossed threshold. + Inconsistent: true, + }, abilityBA, + ) +} + +// TestInitialStateSameSecond verifies that when multiple update events share +// the same second-resolution timestamp, getInitialChannelState seeds from the +// most recent one (highest id) rather than aborting or choosing arbitrarily. +func TestInitialStateSameSecond(t *testing.T) { + t.Parallel() + + clock := clock.NewTestClock(testTime) + store := NewTestDB(t, clock) + ctx := context.Background() + + peerID, err := store.AddPeer(ctx, testPubKey) + require.NoError(t, err) + + channelID, err := store.AddChannel( + ctx, testChanPoint1, testShortChanID1, peerID, + ) + require.NoError(t, err) + + // Two update events share the same second-resolution timestamp. The + // second insert (higher id) is the one the SQL must pick. + sameTime := testTime.Add(10 * time.Second) + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeUpdate, + Timestamp: sameTime, + LocalBalance: fn.Some(btcutil.Amount(100)), + RemoteBalance: fn.Some(btcutil.Amount(900)), + }, + ) + require.NoError(t, err) + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeUpdate, + Timestamp: sameTime, + LocalBalance: fn.Some(btcutil.Amount(200)), + RemoteBalance: fn.Some(btcutil.Amount(800)), + }, + ) + require.NoError(t, err) + + // Construct a bare analyzer. getInitialChannelState only touches the + // store, so the lnd field can stay zero. + a := &ForwardingAnalyzer{store: store} + + startTime := sameTime.Add(time.Second) + state, err := a.getInitialChannelState(ctx, startTime, channelID) + require.NoError(t, err) + require.NotNil(t, state) + require.True(t, state.online, "an update event implies online") + require.Equal(t, btcutil.Amount(200), state.localBalance) + require.Equal(t, btcutil.Amount(800), state.remoteBalance) +} + +// stubLndChannelClient implements the three lndclient.LightningClient methods +// the analyzer exercises. The embedded interface is nil, so any other method +// panics. Callers must not invoke methods outside the overridden set. +type stubLndChannelClient struct { + lndclient.LightningClient + + openChannels []lndclient.ChannelInfo + closedChannels []lndclient.ClosedChannel + forwardingHistory *lndclient.ForwardingHistoryResponse +} + +func (s *stubLndChannelClient) ListChannels(_ context.Context, _, _ bool, + _ ...lndclient.ListChannelsOption) ([]lndclient.ChannelInfo, error) { + + return s.openChannels, nil +} + +func (s *stubLndChannelClient) ClosedChannels(_ context.Context) ( + []lndclient.ClosedChannel, error) { + + return s.closedChannels, nil +} + +func (s *stubLndChannelClient) ForwardingHistory(_ context.Context, + _ lndclient.ForwardingHistoryRequest) ( + *lndclient.ForwardingHistoryResponse, error) { + + if s.forwardingHistory == nil { + return &lndclient.ForwardingHistoryResponse{}, nil + } + + return s.forwardingHistory, nil +} + +// validPubKey1 is the open-channel peer. route.NewVertexFromStr requires a +// 33-byte compressed key (66 hex chars). The pre-existing testPubKey is 65 +// chars long and is fine for the store layer, but the lnd survivorship path +// goes through route.NewVertexFromStr so we use a valid pair here. +const ( + validPubKey1 = "028d4c6347426f2e3f5e2b8e4a1c3b9f1c" + + "4e5d6f7a8b9c0d1e2f3a4b5c6d7e8f9a" + validPubKey2 = "038d4c6347426f2e3f5e2b8e4a1c3b9f1c" + + "4e5d6f7a8b9c0d1e2f3a4b5c6d7e8f9a" +) + +// TestEffectiveUptimeIncludesClosedChannels exercises the survivorship-bias +// guarantee of EffectiveUptime: a peer whose only channel was closed before the +// analysis window must still appear in the result map. Without merging lnd's +// ClosedChannels into the considered set, the closed-channel peer would be +// invisible to the walk and the fleet's reported uptime would over-state +// reality. +func TestEffectiveUptimeIncludesClosedChannels(t *testing.T) { + t.Parallel() + + clk := clock.NewTestClock(testTime) + store := NewTestDB(t, clk) + ctx := context.Background() + + // Two peers: the open-channel peer and the closed-channel peer. + openPeerID, err := store.AddPeer(ctx, validPubKey1) + require.NoError(t, err) + closedPeerID, err := store.AddPeer(ctx, validPubKey2) + require.NoError(t, err) + + // Both channels live in the chanevents store. lnd will report the first + // via ListChannels and the second only via ClosedChannels. + openScid := testShortChanID1 + closedScid := testShortChanID2 + openChanID, err := store.AddChannel( + ctx, testChanPoint1, openScid, openPeerID, + ) + require.NoError(t, err) + closedChanID, err := store.AddChannel( + ctx, testChanPoint2, closedScid, closedPeerID, + ) + require.NoError(t, err) + + // Seed an Update event before startTime for each channel so the + // initial-state walk has a non-zero baseline. Without a baseline, the + // closed channel's online state would be false and its presence in the + // result map would not prove the survivorship code path drove it. + seedTime := testTime + for _, chanID := range []int64{openChanID, closedChanID} { + err = store.AddChannelEvent( + ctx, &ChannelEvent{ + ChannelID: chanID, + EventType: EventTypeUpdate, + Timestamp: seedTime, + LocalBalance: fn.Some(btcutil.Amount(1000)), + RemoteBalance: fn.Some(btcutil.Amount(1000)), + }, + ) + require.NoError(t, err) + } + + openVertex, err := route.NewVertexFromStr(validPubKey1) + require.NoError(t, err) + closedVertex, err := route.NewVertexFromStr(validPubKey2) + require.NoError(t, err) + + // lnd reports only the open channel via ListChannels. The closed + // channel surfaces solely through ClosedChannels. + stub := &stubLndChannelClient{ + openChannels: []lndclient.ChannelInfo{ + { + ChannelID: openScid, + PubKeyBytes: openVertex, + }, + }, + closedChannels: []lndclient.ClosedChannel{ + { + ChannelID: closedScid, + PubKeyBytes: closedVertex, + }, + }, + } + + a := NewForwardingAnalyzer(store, lndclient.LndServices{Client: stub}) + + startTime := seedTime.Add(time.Second) + endTime := startTime.Add(time.Minute) + + abilities, err := a.EffectiveUptime(ctx, startTime, endTime, 0, 0) + require.NoError(t, err) + + // Cross-pair entries in both directions are the cleanest assertion + // that the closed-channel peer participates in the walk, not just + // indexes into it. Their presence is the survivorship guarantee + // under test: without merging lnd's ClosedChannels into the + // considered set, neither cross would appear. + require.Contains( + t, abilities, PeerPair{PeerIn: validPubKey1, PeerOut: validPubKey2}, + "closed-channel peer absent: survivorship handling skipped", + ) + require.Contains( + t, abilities, PeerPair{PeerIn: validPubKey2, PeerOut: validPubKey1}, + "closed-channel peer absent: survivorship handling skipped", + ) +} diff --git a/chanevents/test_postgres.go b/chanevents/test_postgres.go index daf34e7..5277a02 100644 --- a/chanevents/test_postgres.go +++ b/chanevents/test_postgres.go @@ -11,7 +11,7 @@ import ( ) // NewTestDB creates a new test chanevents.Store backed by a postgres DB. -func NewTestDB(t *testing.T, clock clock.Clock) *Store { +func NewTestDB(t testing.TB, clock clock.Clock) *Store { // We'll create a new test database. The call to NewTestPostgresDB will // automatically create the DB and apply the migrations. testDB := db.NewTestPostgresDB(t) diff --git a/chanevents/test_sql.go b/chanevents/test_sql.go index 1785159..9ff7a93 100644 --- a/chanevents/test_sql.go +++ b/chanevents/test_sql.go @@ -9,7 +9,7 @@ import ( ) // createStore is a helper function that creates a new Store. -func createStore(t *testing.T, sqlDB *sqldb.BaseDB, clock clock.Clock) *Store { +func createStore(t testing.TB, sqlDB *sqldb.BaseDB, clock clock.Clock) *Store { queries := sqlc.NewForType(sqlDB, sqlDB.BackendType) store := NewStore(sqlDB, queries, clock) diff --git a/chanevents/test_sqlite.go b/chanevents/test_sqlite.go index a6c3522..7cbf7f5 100644 --- a/chanevents/test_sqlite.go +++ b/chanevents/test_sqlite.go @@ -12,7 +12,7 @@ import ( ) // NewTestDB creates a new test chanevents.Store backed by a sqlite DB. -func NewTestDB(t *testing.T, clock clock.Clock) *Store { +func NewTestDB(t testing.TB, clock clock.Clock) *Store { // We'll create a new test database. The call to NewTestSqliteDB will // automatically create the DB and apply the migrations. testDB := sqldb.NewTestSqliteDB(t, db.FaradayMigrationSets) diff --git a/db/postgres.go b/db/postgres.go index 4a0cb82..8cc42b0 100644 --- a/db/postgres.go +++ b/db/postgres.go @@ -9,7 +9,7 @@ import ( // NewTestPostgresDB is a helper function that creates a Postgres database for // testing. -func NewTestPostgresDB(t *testing.T) *sqldb.PostgresStore { +func NewTestPostgresDB(t testing.TB) *sqldb.PostgresStore { t.Helper() t.Logf("Creating new Postgres DB for testing")