diff --git a/chanevents/quantile.go b/chanevents/quantile.go new file mode 100644 index 0000000..d0562eb --- /dev/null +++ b/chanevents/quantile.go @@ -0,0 +1,55 @@ +package chanevents + +import ( + "errors" + "sort" + + "golang.org/x/exp/constraints" +) + +// number is the type constraint Quantile accepts: any sortable numeric type. +type number interface { + constraints.Integer | constraints.Float +} + +// Quantile computes the q-quantile of a slice of comparable values. This can be +// used to compute the median (q=0.5) or the min (q=0) or max (q=1). +func Quantile[T number](xs []T, q float64) (float64, error) { + if q < 0 || q > 1 { + return 0, errors.New("quantile must be between 0 and 1") + } + + if len(xs) == 0 { + return 0, errors.New("cannot compute quantile of empty slice") + } + + if len(xs) == 1 { + return float64(xs[0]), nil + } + + // Create a copy of the slice to avoid mutating the original. + ys := make([]T, len(xs)) + copy(ys, xs) + + sort.Slice(ys, func(i, j int) bool { + return ys[i] < ys[j] + }) + + // Compute fractional index of q-quantile. + if q == 1.0 { + return float64(ys[len(ys)-1]), nil + } + i := q * float64(len(ys)-1) + + // Interpolate between the two consecutive values, depending on the + // fractional index position in between. + lowerIdx := int(i) + upperIdx := lowerIdx + 1 + + lowerVal := float64(ys[lowerIdx]) + upperVal := float64(ys[upperIdx]) + + indexDiff := i - float64(lowerIdx) + + return lowerVal + (upperVal-lowerVal)*indexDiff, nil +} diff --git a/chanevents/quantile_test.go b/chanevents/quantile_test.go new file mode 100644 index 0000000..c5c61f8 --- /dev/null +++ b/chanevents/quantile_test.go @@ -0,0 +1,162 @@ +package chanevents + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestQuantile pins the interpolation contract and the error paths Quantile +// surfaces to callers. +func TestQuantile(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + q float64 + xs []float64 + want float64 + expectErr bool + }{ + { + name: "empty slice", + xs: []float64{}, + expectErr: true, + }, + { + name: "single value", + xs: []float64{ + 1, + }, + want: 1.0, + }, + { + name: "single value median", + xs: []float64{ + 1, + }, + q: 0.5, + want: 1.0, + }, + { + name: "quantile out of bound below", + xs: []float64{}, + q: -0.1, + expectErr: true, + }, + { + name: "quantile out of bound above", + xs: []float64{}, + q: 1.1, + expectErr: true, + }, + { + name: "median odd values", + q: 0.5, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 3.0, + }, + { + name: "median even values", + q: 0.5, + xs: []float64{ + 1, + 2, + 3, + 4, + }, + want: 2.5, + }, + { + name: "median unsorted", + q: 0.5, + xs: []float64{ + 1, + 3, + 2, + 4, + }, + want: 2.5, + }, + { + name: "0 percentile", + q: 0, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 1.0, + }, + { + name: "25 percentile", + q: 0.25, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 2.0, + }, + { + name: "75 percentile", + q: 0.75, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 4.0, + }, + { + name: "0.875 percentile", + q: 0.875, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 4.5, + }, + { + name: "100 percentile", + q: 1.0, + xs: []float64{ + 1, + 2, + 3, + 4, + 5, + }, + want: 5.0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(tt *testing.T) { + tt.Parallel() + + got, err := Quantile(tc.xs, tc.q) + if tc.expectErr { + require.Error(tt, err) + return + } + + require.InDelta(tt, tc.want, got, 1e-6) + }) + } +} diff --git a/chanevents/store.go b/chanevents/store.go index e652faa..b454a33 100644 --- a/chanevents/store.go +++ b/chanevents/store.go @@ -43,6 +43,14 @@ type Queries interface { GetChannelEvents(ctx context.Context, arg sqlc.GetChannelEventsParams) ([]sqlc.ChannelEvent, error) + + GetLatestChannelEventBefore(ctx context.Context, + arg sqlc.GetLatestChannelEventBeforeParams) ( + sqlc.ChannelEvent, + error, + ) + + GetChannels(ctx context.Context) ([]sqlc.GetChannelsRow, error) } // Store provides access to the db for channel events. @@ -184,6 +192,30 @@ func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel, }, nil } +// GetChannelByShortChanID retrieves a channel by its short channel ID, +// returning ErrUnknownChannel if no row matches. +func (s *Store) GetChannelByShortChanID(ctx context.Context, + shortChannelID uint64) (*Channel, error) { + + dbChannel, err := s.db.GetChannelByShortChanID( + ctx, scidToInt64(shortChannelID), + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrUnknownChannel + } + + return nil, err + } + + return &Channel{ + ID: dbChannel.ID, + ChannelPoint: dbChannel.ChannelPoint, + ShortChannelID: int64ToSCID(dbChannel.ShortChannelID), + PeerID: dbChannel.PeerID, + }, nil +} + // AddChannelEvent adds a new channel event. func (s *Store) AddChannelEvent(ctx context.Context, event *ChannelEvent) error { @@ -257,6 +289,54 @@ func (s *Store) GetChannelEvents(ctx context.Context, channelID, afterID int64, return events, nil } +// ScidToPeerMap returns the historic scid→peer index, including channels that +// have since closed. Unconfirmed channels (scid still zero) are not part of +// the contract. +func (s *Store) ScidToPeerMap(ctx context.Context) (map[uint64]string, error) { + dbChannels, err := s.db.GetChannels(ctx) + if err != nil { + return nil, err + } + + scidToPeer := make(map[uint64]string, len(dbChannels)) + for _, dbChannel := range dbChannels { + // The short channel ID can be zero if it's not known yet. We + // should just ignore those. + if dbChannel.ShortChannelID == 0 { + continue + } + + scidToPeer[uint64(dbChannel.ShortChannelID)] = dbChannel.Pubkey + } + + return scidToPeer, nil +} + +// GetLatestChannelUpdateBefore returns the latest channel event before a given +// time (exclusive). If no event is found, it returns (nil, nil). +func (s *Store) GetLatestChannelUpdateBefore(ctx context.Context, + channelID int64, before time.Time) (*ChannelEvent, error) { + + dbEvent, err := s.db.GetLatestChannelEventBefore( + ctx, sqlc.GetLatestChannelEventBeforeParams{ + ChannelID: channelID, + Timestamp: before.UTC(), + EventType: int16(EventTypeUpdate), + }, + ) + if err != nil { + // If there are no events before the start time, we return (nil, + // nil). + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + + return nil, err + } + + return marshalChannelEvent(dbEvent), nil +} + // marshalChannelEvent converts a db channel event into our internal type. func marshalChannelEvent(dbEvent sqlc.ChannelEvent) *ChannelEvent { var localBalance fn.Option[btcutil.Amount] diff --git a/chanevents/store_test.go b/chanevents/store_test.go index 86f025a..232de48 100644 --- a/chanevents/store_test.go +++ b/chanevents/store_test.go @@ -93,6 +93,20 @@ func TestStore(t *testing.T) { require.Equal(t, testShortChanID1, dbChannel.ShortChannelID) require.Equal(t, peerID, dbChannel.PeerID) + // Look up the same channel by its scid; the analyzer relies on this + // inverse of AddChannel. + dbChannel, err = store.GetChannelByShortChanID(ctx, testShortChanID1) + require.NoError(t, err) + require.Equal(t, channelID, dbChannel.ID) + require.Equal(t, testChanPoint1, dbChannel.ChannelPoint) + require.Equal(t, testShortChanID1, dbChannel.ShortChannelID) + require.Equal(t, peerID, dbChannel.PeerID) + + // An unknown scid surfaces the typed sentinel, not raw sql.ErrNoRows. + dbChannel, err = store.GetChannelByShortChanID(ctx, 9999) + require.ErrorIs(t, err, ErrUnknownChannel) + require.Nil(t, dbChannel) + // Add a second channel for the same peer. channel2ID, err := store.AddChannel( ctx, testChanPoint2, testShortChanID2, peerID, @@ -100,6 +114,13 @@ func TestStore(t *testing.T) { require.NoError(t, err) require.NotZero(t, channel2ID) + // Get the historic channel to peer map. + chanToPeer, err := store.ScidToPeerMap(ctx) + require.NoError(t, err) + require.Len(t, chanToPeer, 2) + require.Equal(t, testPubKey, chanToPeer[testShortChanID1]) + require.Equal(t, testPubKey, chanToPeer[testShortChanID2]) + // Add an online event for the channel. onlineEvent := &ChannelEvent{ ChannelID: channelID, @@ -134,6 +155,23 @@ func TestStore(t *testing.T) { requireEqualEvent( t, updateEvent, testTime.Add(time.Second), events[1], ) + updateEvent = events[1] + + // If we query a time after the update event, we'll obtain the update + // event as the latest event. + initEvent, err := store.GetLatestChannelUpdateBefore( + ctx, channelID, updateEvent.Timestamp.Add(500*time.Millisecond), + ) + require.NoError(t, err) + requireEqualEvent(t, updateEvent, testTime.Add(time.Second), initEvent) + + // If we query at the update event's timestamp, the only event before + // that is left is the online event, which is not an update. + initEvent, err = store.GetLatestChannelUpdateBefore( + ctx, channelID, updateEvent.Timestamp, + ) + require.NoError(t, err) + require.Nil(t, initEvent) // Advance the clock and add a sync event to verify the IsSync flag // round-trips correctly. diff --git a/db/sqlc/chanevents.sql.go b/db/sqlc/chanevents.sql.go index 9a24296..ba1dee2 100644 --- a/db/sqlc/chanevents.sql.go +++ b/db/sqlc/chanevents.sql.go @@ -98,6 +98,69 @@ func (q *Queries) GetChannelEvents(ctx context.Context, arg GetChannelEventsPara return items, nil } +const getChannels = `-- name: GetChannels :many +SELECT c.id, c.short_channel_id, p.pubkey +FROM channels c +JOIN peers p ON c.peer_id = p.id +` + +type GetChannelsRow struct { + ID int64 + ShortChannelID int64 + Pubkey string +} + +func (q *Queries) GetChannels(ctx context.Context) ([]GetChannelsRow, error) { + rows, err := q.db.QueryContext(ctx, getChannels) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetChannelsRow + for rows.Next() { + var i GetChannelsRow + if err := rows.Scan(&i.ID, &i.ShortChannelID, &i.Pubkey); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getLatestChannelEventBefore = `-- name: GetLatestChannelEventBefore :one +SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, is_sync FROM channel_events +WHERE channel_id = $1 AND event_type = $2 AND timestamp < $3 +ORDER BY timestamp DESC, id DESC +LIMIT 1 +` + +type GetLatestChannelEventBeforeParams struct { + ChannelID int64 + EventType int16 + Timestamp time.Time +} + +func (q *Queries) GetLatestChannelEventBefore(ctx context.Context, arg GetLatestChannelEventBeforeParams) (ChannelEvent, error) { + row := q.db.QueryRowContext(ctx, getLatestChannelEventBefore, arg.ChannelID, arg.EventType, arg.Timestamp) + var i ChannelEvent + err := row.Scan( + &i.ID, + &i.ChannelID, + &i.EventType, + &i.Timestamp, + &i.LocalBalanceSat, + &i.RemoteBalanceSat, + &i.IsSync, + ) + return i, err +} + const getPeerByPubKey = `-- name: GetPeerByPubKey :one SELECT id, pubkey FROM peers WHERE pubkey = $1 ` diff --git a/db/sqlc/querier.go b/db/sqlc/querier.go index f56f962..b62ec68 100644 --- a/db/sqlc/querier.go +++ b/db/sqlc/querier.go @@ -12,6 +12,8 @@ type Querier interface { GetChannelByChanPoint(ctx context.Context, channelPoint string) (Channel, error) GetChannelByShortChanID(ctx context.Context, shortChannelID int64) (Channel, error) GetChannelEvents(ctx context.Context, arg GetChannelEventsParams) ([]ChannelEvent, error) + GetChannels(ctx context.Context) ([]GetChannelsRow, error) + GetLatestChannelEventBefore(ctx context.Context, arg GetLatestChannelEventBeforeParams) (ChannelEvent, error) GetPeerByPubKey(ctx context.Context, pubkey string) (Peer, error) InsertChannel(ctx context.Context, arg InsertChannelParams) (int64, error) InsertChannelEvent(ctx context.Context, arg InsertChannelEventParams) error diff --git a/db/sqlc/queries/chanevents.sql b/db/sqlc/queries/chanevents.sql index 186c8d7..d9d1629 100644 --- a/db/sqlc/queries/chanevents.sql +++ b/db/sqlc/queries/chanevents.sql @@ -27,3 +27,14 @@ WHERE channel_id = $1 AND timestamp < $4 ORDER BY id ASC LIMIT $5; + +-- name: GetLatestChannelEventBefore :one +SELECT * FROM channel_events +WHERE channel_id = $1 AND event_type = $2 AND timestamp < $3 +ORDER BY timestamp DESC, id DESC +LIMIT 1; + +-- name: GetChannels :many +SELECT c.id, c.short_channel_id, p.pubkey +FROM channels c +JOIN peers p ON c.peer_id = p.id; diff --git a/go.mod b/go.mod index f0459a0..4f03fea 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/shopspring/decimal v1.2.0 github.com/stretchr/testify v1.10.0 github.com/urfave/cli v1.22.14 + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 gopkg.in/macaroon-bakery.v2 v2.0.1 @@ -163,7 +164,6 @@ require ( go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.39.0 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/net v0.41.0 // indirect golang.org/x/sync v0.15.0 // indirect golang.org/x/sys v0.34.0 // indirect