From 7431bb5d679978de1eaf9d8021a518cd08dac108 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Wed, 6 May 2026 07:04:27 +0200 Subject: [PATCH 1/4] chanevents+db: add latest-event-before lookup Add GetLatestChannelUpdateBefore, which fetches the most recent EventTypeUpdate strictly before a given instant. Forwarding-ability analyses that summarise behaviour over a window must seed their state from the channel's balance at the window's lower bound; without the ability to look back past that bound, the first events in the window have no baseline to compare against. The lookup tolerates missing predecessors by returning (nil, nil), letting callers distinguish "no prior update" from a genuine error. Coverage exercises both the present and absent cases against the existing TestStore fixture. --- chanevents/store.go | 31 +++++++++++++++++++++++++++++++ chanevents/store_test.go | 17 +++++++++++++++++ db/sqlc/chanevents.sql.go | 28 ++++++++++++++++++++++++++++ db/sqlc/querier.go | 1 + db/sqlc/queries/chanevents.sql | 6 ++++++ 5 files changed, 83 insertions(+) diff --git a/chanevents/store.go b/chanevents/store.go index e652faa..78a7a7e 100644 --- a/chanevents/store.go +++ b/chanevents/store.go @@ -43,6 +43,12 @@ type Queries interface { GetChannelEvents(ctx context.Context, arg sqlc.GetChannelEventsParams) ([]sqlc.ChannelEvent, error) + + GetLatestChannelEventBefore(ctx context.Context, + arg sqlc.GetLatestChannelEventBeforeParams) ( + sqlc.ChannelEvent, + error, + ) } // Store provides access to the db for channel events. @@ -257,6 +263,31 @@ func (s *Store) GetChannelEvents(ctx context.Context, channelID, afterID int64, return events, nil } +// GetLatestChannelUpdateBefore returns the latest channel event before a given +// time (exclusive). If no event is found, it returns (nil, nil). +func (s *Store) GetLatestChannelUpdateBefore(ctx context.Context, + channelID int64, before time.Time) (*ChannelEvent, error) { + + dbEvent, err := s.db.GetLatestChannelEventBefore( + ctx, sqlc.GetLatestChannelEventBeforeParams{ + ChannelID: channelID, + Timestamp: before.UTC(), + EventType: int16(EventTypeUpdate), + }, + ) + if err != nil { + // If there are no events before the start time, we return (nil, + // nil). + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + + return nil, err + } + + return marshalChannelEvent(dbEvent), nil +} + // marshalChannelEvent converts a db channel event into our internal type. func marshalChannelEvent(dbEvent sqlc.ChannelEvent) *ChannelEvent { var localBalance fn.Option[btcutil.Amount] diff --git a/chanevents/store_test.go b/chanevents/store_test.go index 86f025a..7cb91c1 100644 --- a/chanevents/store_test.go +++ b/chanevents/store_test.go @@ -134,6 +134,23 @@ func TestStore(t *testing.T) { requireEqualEvent( t, updateEvent, testTime.Add(time.Second), events[1], ) + updateEvent = events[1] + + // If we query a time after the update event, we'll obtain the update + // event as the latest event. + initEvent, err := store.GetLatestChannelUpdateBefore( + ctx, channelID, updateEvent.Timestamp.Add(500*time.Millisecond), + ) + require.NoError(t, err) + requireEqualEvent(t, updateEvent, testTime.Add(time.Second), initEvent) + + // If we query at the update event's timestamp, the only event before + // that is left is the online event, which is not an update. + initEvent, err = store.GetLatestChannelUpdateBefore( + ctx, channelID, updateEvent.Timestamp, + ) + require.NoError(t, err) + require.Nil(t, initEvent) // Advance the clock and add a sync event to verify the IsSync flag // round-trips correctly. diff --git a/db/sqlc/chanevents.sql.go b/db/sqlc/chanevents.sql.go index 9a24296..82441f6 100644 --- a/db/sqlc/chanevents.sql.go +++ b/db/sqlc/chanevents.sql.go @@ -98,6 +98,34 @@ func (q *Queries) GetChannelEvents(ctx context.Context, arg GetChannelEventsPara return items, nil } +const getLatestChannelEventBefore = `-- name: GetLatestChannelEventBefore :one +SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, is_sync FROM channel_events +WHERE channel_id = $1 AND event_type = $2 AND timestamp < $3 +ORDER BY timestamp DESC, id DESC +LIMIT 1 +` + +type GetLatestChannelEventBeforeParams struct { + ChannelID int64 + EventType int16 + Timestamp time.Time +} + +func (q *Queries) GetLatestChannelEventBefore(ctx context.Context, arg GetLatestChannelEventBeforeParams) (ChannelEvent, error) { + row := q.db.QueryRowContext(ctx, getLatestChannelEventBefore, arg.ChannelID, arg.EventType, arg.Timestamp) + var i ChannelEvent + err := row.Scan( + &i.ID, + &i.ChannelID, + &i.EventType, + &i.Timestamp, + &i.LocalBalanceSat, + &i.RemoteBalanceSat, + &i.IsSync, + ) + return i, err +} + const getPeerByPubKey = `-- name: GetPeerByPubKey :one SELECT id, pubkey FROM peers WHERE pubkey = $1 ` diff --git a/db/sqlc/querier.go b/db/sqlc/querier.go index f56f962..661f75c 100644 --- a/db/sqlc/querier.go +++ b/db/sqlc/querier.go @@ -12,6 +12,7 @@ type Querier interface { GetChannelByChanPoint(ctx context.Context, channelPoint string) (Channel, error) GetChannelByShortChanID(ctx context.Context, shortChannelID int64) (Channel, error) GetChannelEvents(ctx context.Context, arg GetChannelEventsParams) ([]ChannelEvent, error) + GetLatestChannelEventBefore(ctx context.Context, arg GetLatestChannelEventBeforeParams) (ChannelEvent, error) GetPeerByPubKey(ctx context.Context, pubkey string) (Peer, error) InsertChannel(ctx context.Context, arg InsertChannelParams) (int64, error) InsertChannelEvent(ctx context.Context, arg InsertChannelEventParams) error diff --git a/db/sqlc/queries/chanevents.sql b/db/sqlc/queries/chanevents.sql index 186c8d7..9f567f3 100644 --- a/db/sqlc/queries/chanevents.sql +++ b/db/sqlc/queries/chanevents.sql @@ -27,3 +27,9 @@ WHERE channel_id = $1 AND timestamp < $4 ORDER BY id ASC LIMIT $5; + +-- name: GetLatestChannelEventBefore :one +SELECT * FROM channel_events +WHERE channel_id = $1 AND event_type = $2 AND timestamp < $3 +ORDER BY timestamp DESC, id DESC +LIMIT 1; From 23631733258e0e319d18e2d32a67288380dee86a Mon Sep 17 00:00:00 2001 From: bitromortac Date: Wed, 6 May 2026 07:05:20 +0200 Subject: [PATCH 2/4] chanevents+db: add scid-to-peer index Add ScidToPeerMap, which materialises a snapshot of every short channel id paired with the pubkey of the channel's remote peer. Forwarding-data sources index events by short channel id, but downstream analyses need to attribute behaviour to the peer, not the channel. The map skips channels whose short channel id is still zero (unconfirmed), so callers see only fully advertised channels. Coverage extends TestStore with a two-channel fixture pinning the join. --- chanevents/store.go | 25 ++++++++++++++++++++++++ chanevents/store_test.go | 7 +++++++ db/sqlc/chanevents.sql.go | 35 ++++++++++++++++++++++++++++++++++ db/sqlc/querier.go | 1 + db/sqlc/queries/chanevents.sql | 5 +++++ 5 files changed, 73 insertions(+) diff --git a/chanevents/store.go b/chanevents/store.go index 78a7a7e..0cbe442 100644 --- a/chanevents/store.go +++ b/chanevents/store.go @@ -49,6 +49,8 @@ type Queries interface { sqlc.ChannelEvent, error, ) + + GetChannels(ctx context.Context) ([]sqlc.GetChannelsRow, error) } // Store provides access to the db for channel events. @@ -263,6 +265,29 @@ func (s *Store) GetChannelEvents(ctx context.Context, channelID, afterID int64, return events, nil } +// ScidToPeerMap returns the historic scid→peer index, including channels that +// have since closed. Unconfirmed channels (scid still zero) are not part of +// the contract. +func (s *Store) ScidToPeerMap(ctx context.Context) (map[uint64]string, error) { + dbChannels, err := s.db.GetChannels(ctx) + if err != nil { + return nil, err + } + + scidToPeer := make(map[uint64]string, len(dbChannels)) + for _, dbChannel := range dbChannels { + // The short channel ID can be zero if it's not known yet. We + // should just ignore those. + if dbChannel.ShortChannelID == 0 { + continue + } + + scidToPeer[uint64(dbChannel.ShortChannelID)] = dbChannel.Pubkey + } + + return scidToPeer, nil +} + // GetLatestChannelUpdateBefore returns the latest channel event before a given // time (exclusive). If no event is found, it returns (nil, nil). func (s *Store) GetLatestChannelUpdateBefore(ctx context.Context, diff --git a/chanevents/store_test.go b/chanevents/store_test.go index 7cb91c1..b2cbae6 100644 --- a/chanevents/store_test.go +++ b/chanevents/store_test.go @@ -100,6 +100,13 @@ func TestStore(t *testing.T) { require.NoError(t, err) require.NotZero(t, channel2ID) + // Get the historic channel to peer map. + chanToPeer, err := store.ScidToPeerMap(ctx) + require.NoError(t, err) + require.Len(t, chanToPeer, 2) + require.Equal(t, testPubKey, chanToPeer[testShortChanID1]) + require.Equal(t, testPubKey, chanToPeer[testShortChanID2]) + // Add an online event for the channel. onlineEvent := &ChannelEvent{ ChannelID: channelID, diff --git a/db/sqlc/chanevents.sql.go b/db/sqlc/chanevents.sql.go index 82441f6..ba1dee2 100644 --- a/db/sqlc/chanevents.sql.go +++ b/db/sqlc/chanevents.sql.go @@ -98,6 +98,41 @@ func (q *Queries) GetChannelEvents(ctx context.Context, arg GetChannelEventsPara return items, nil } +const getChannels = `-- name: GetChannels :many +SELECT c.id, c.short_channel_id, p.pubkey +FROM channels c +JOIN peers p ON c.peer_id = p.id +` + +type GetChannelsRow struct { + ID int64 + ShortChannelID int64 + Pubkey string +} + +func (q *Queries) GetChannels(ctx context.Context) ([]GetChannelsRow, error) { + rows, err := q.db.QueryContext(ctx, getChannels) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetChannelsRow + for rows.Next() { + var i GetChannelsRow + if err := rows.Scan(&i.ID, &i.ShortChannelID, &i.Pubkey); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getLatestChannelEventBefore = `-- name: GetLatestChannelEventBefore :one SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, is_sync FROM channel_events WHERE channel_id = $1 AND event_type = $2 AND timestamp < $3 diff --git a/db/sqlc/querier.go b/db/sqlc/querier.go index 661f75c..b62ec68 100644 --- a/db/sqlc/querier.go +++ b/db/sqlc/querier.go @@ -12,6 +12,7 @@ type Querier interface { GetChannelByChanPoint(ctx context.Context, channelPoint string) (Channel, error) GetChannelByShortChanID(ctx context.Context, shortChannelID int64) (Channel, error) GetChannelEvents(ctx context.Context, arg GetChannelEventsParams) ([]ChannelEvent, error) + GetChannels(ctx context.Context) ([]GetChannelsRow, error) GetLatestChannelEventBefore(ctx context.Context, arg GetLatestChannelEventBeforeParams) (ChannelEvent, error) GetPeerByPubKey(ctx context.Context, pubkey string) (Peer, error) InsertChannel(ctx context.Context, arg InsertChannelParams) (int64, error) diff --git a/db/sqlc/queries/chanevents.sql b/db/sqlc/queries/chanevents.sql index 9f567f3..d9d1629 100644 --- a/db/sqlc/queries/chanevents.sql +++ b/db/sqlc/queries/chanevents.sql @@ -33,3 +33,8 @@ SELECT * FROM channel_events WHERE channel_id = $1 AND event_type = $2 AND timestamp < $3 ORDER BY timestamp DESC, id DESC LIMIT 1; + +-- name: GetChannels :many +SELECT c.id, c.short_channel_id, p.pubkey +FROM channels c +JOIN peers p ON c.peer_id = p.id; From be671db05f1c698c4ad9e4e404b1e1a3eeb26cb9 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Tue, 12 May 2026 10:51:42 +0200 Subject: [PATCH 3/4] chanevents: add short-chan-id store lookup Add GetChannelByShortChanID, the inverse of AddChannel. The forwarding-ability analyzer receives scids from lnd's forwarding history and must map them back to the chanevents store's internal channel id to query events. --- chanevents/store.go | 24 ++++++++++++++++++++++++ chanevents/store_test.go | 14 ++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/chanevents/store.go b/chanevents/store.go index 0cbe442..b454a33 100644 --- a/chanevents/store.go +++ b/chanevents/store.go @@ -192,6 +192,30 @@ func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel, }, nil } +// GetChannelByShortChanID retrieves a channel by its short channel ID, +// returning ErrUnknownChannel if no row matches. +func (s *Store) GetChannelByShortChanID(ctx context.Context, + shortChannelID uint64) (*Channel, error) { + + dbChannel, err := s.db.GetChannelByShortChanID( + ctx, scidToInt64(shortChannelID), + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrUnknownChannel + } + + return nil, err + } + + return &Channel{ + ID: dbChannel.ID, + ChannelPoint: dbChannel.ChannelPoint, + ShortChannelID: int64ToSCID(dbChannel.ShortChannelID), + PeerID: dbChannel.PeerID, + }, nil +} + // AddChannelEvent adds a new channel event. func (s *Store) AddChannelEvent(ctx context.Context, event *ChannelEvent) error { diff --git a/chanevents/store_test.go b/chanevents/store_test.go index b2cbae6..232de48 100644 --- a/chanevents/store_test.go +++ b/chanevents/store_test.go @@ -93,6 +93,20 @@ func TestStore(t *testing.T) { require.Equal(t, testShortChanID1, dbChannel.ShortChannelID) require.Equal(t, peerID, dbChannel.PeerID) + // Look up the same channel by its scid; the analyzer relies on this + // inverse of AddChannel. + dbChannel, err = store.GetChannelByShortChanID(ctx, testShortChanID1) + require.NoError(t, err) + require.Equal(t, channelID, dbChannel.ID) + require.Equal(t, testChanPoint1, dbChannel.ChannelPoint) + require.Equal(t, testShortChanID1, dbChannel.ShortChannelID) + require.Equal(t, peerID, dbChannel.PeerID) + + // An unknown scid surfaces the typed sentinel, not raw sql.ErrNoRows. + dbChannel, err = store.GetChannelByShortChanID(ctx, 9999) + require.ErrorIs(t, err, ErrUnknownChannel) + require.Nil(t, dbChannel) + // Add a second channel for the same peer. channel2ID, err := store.AddChannel( ctx, testChanPoint2, testShortChanID2, peerID, From 41ce9842440c9f30a6518005b4bb991066e20665 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 7 May 2026 15:28:52 +0200 Subject: [PATCH 4/4] chanevents: add quantile function Add Quantile, a generic linear-interpolation q-quantile over a slice of sortable numeric values. The forwarding-ability analyzer needs to characterise the distribution of historical forwarded amounts and uses a configurable percentile as the headline statistic; lifting the computation into its own helper keeps the analyzer focused on forwarding logic and gives the quantile contract its own table-driven test that covers the interpolation rule and the empty/out-of-bounds error paths. --- chanevents/quantile.go | 55 ++++++++++++ chanevents/quantile_test.go | 162 ++++++++++++++++++++++++++++++++++++ go.mod | 2 +- 3 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 chanevents/quantile.go create mode 100644 chanevents/quantile_test.go diff --git a/chanevents/quantile.go b/chanevents/quantile.go new file mode 100644 index 0000000..d0562eb --- /dev/null +++ b/chanevents/quantile.go @@ -0,0 +1,55 @@ +package chanevents + +import ( + "errors" + "sort" + + "golang.org/x/exp/constraints" +) + +// number is the type constraint Quantile accepts: any sortable numeric type. +type number interface { + constraints.Integer | constraints.Float +} + +// Quantile computes the q-quantile of a slice of comparable values. This can be +// used to compute the median (q=0.5) or the min (q=0) or max (q=1). +func Quantile[T number](xs []T, q float64) (float64, error) { + if q < 0 || q > 1 { + return 0, errors.New("quantile must be between 0 and 1") + } + + if len(xs) == 0 { + return 0, errors.New("cannot compute quantile of empty slice") + } + + if len(xs) == 1 { + return float64(xs[0]), nil + } + + // Create a copy of the slice to avoid mutating the original. + ys := make([]T, len(xs)) + copy(ys, xs) + + sort.Slice(ys, func(i, j int) bool { + return ys[i] < ys[j] + }) + + // Compute fractional index of q-quantile. + if q == 1.0 { + return float64(ys[len(ys)-1]), nil + } + i := q * float64(len(ys)-1) + + // Interpolate between the two consecutive values, depending on the + // fractional index position in between. + lowerIdx := int(i) + upperIdx := lowerIdx + 1 + + lowerVal := float64(ys[lowerIdx]) + upperVal := float64(ys[upperIdx]) + + indexDiff := i - float64(lowerIdx) + + return lowerVal + (upperVal-lowerVal)*indexDiff, nil +} diff --git a/chanevents/quantile_test.go b/chanevents/quantile_test.go new file mode 100644 index 0000000..c5c61f8 --- /dev/null +++ b/chanevents/quantile_test.go @@ -0,0 +1,162 @@ +package chanevents + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestQuantile pins the interpolation contract and the error paths Quantile +// surfaces to callers. +func TestQuantile(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + q float64 + xs []float64 + want float64 + expectErr bool + }{ + { + name: "empty slice", + xs: []float64{}, + expectErr: true, + }, + { + name: "single value", + xs: []float64{ + 1, + }, + want: 1.0, + }, + { + name: "single value median", + xs: []float64{ + 1, + }, + q: 0.5, + want: 1.0, + }, + { + name: "quantile out of bound below", + xs: []float64{}, + q: -0.1, + expectErr: true, + }, + { + name: "quantile out of bound above", + xs: []float64{}, + q: 1.1, + expectErr: true, + }, + { + name: "median odd values", + q: 0.5, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 3.0, + }, + { + name: "median even values", + q: 0.5, + xs: []float64{ + 1, + 2, + 3, + 4, + }, + want: 2.5, + }, + { + name: "median unsorted", + q: 0.5, + xs: []float64{ + 1, + 3, + 2, + 4, + }, + want: 2.5, + }, + { + name: "0 percentile", + q: 0, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 1.0, + }, + { + name: "25 percentile", + q: 0.25, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 2.0, + }, + { + name: "75 percentile", + q: 0.75, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 4.0, + }, + { + name: "0.875 percentile", + q: 0.875, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 4.5, + }, + { + name: "100 percentile", + q: 1.0, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 5.0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(tt *testing.T) { + tt.Parallel() + + got, err := Quantile(tc.xs, tc.q) + if tc.expectErr { + require.Error(tt, err) + return + } + + require.InDelta(tt, tc.want, got, 1e-6) + }) + } +} diff --git a/go.mod b/go.mod index f0459a0..4f03fea 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/shopspring/decimal v1.2.0 github.com/stretchr/testify v1.10.0 github.com/urfave/cli v1.22.14 + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 gopkg.in/macaroon-bakery.v2 v2.0.1 @@ -163,7 +164,6 @@ require ( go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.39.0 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/net v0.41.0 // indirect golang.org/x/sync v0.15.0 // indirect golang.org/x/sys v0.34.0 // indirect