Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions chanevents/quantile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package chanevents

import (
"errors"
"sort"

"golang.org/x/exp/constraints"
)

// number is the type constraint Quantile accepts: any sortable numeric type.
type number interface {
constraints.Integer | constraints.Float
}

// Quantile computes the q-quantile of a slice of comparable values. This can be
// used to compute the median (q=0.5) or the min (q=0) or max (q=1).
func Quantile[T number](xs []T, q float64) (float64, error) {
if q < 0 || q > 1 {
return 0, errors.New("quantile must be between 0 and 1")
}

if len(xs) == 0 {
return 0, errors.New("cannot compute quantile of empty slice")
}

if len(xs) == 1 {
return float64(xs[0]), nil
}

// Create a copy of the slice to avoid mutating the original.
ys := make([]T, len(xs))
copy(ys, xs)

sort.Slice(ys, func(i, j int) bool {
return ys[i] < ys[j]
})

// Compute fractional index of q-quantile.
if q == 1.0 {
return float64(ys[len(ys)-1]), nil
}
i := q * float64(len(ys)-1)

// Interpolate between the two consecutive values, depending on the
// fractional index position in between.
lowerIdx := int(i)
upperIdx := lowerIdx + 1

lowerVal := float64(ys[lowerIdx])
upperVal := float64(ys[upperIdx])

indexDiff := i - float64(lowerIdx)

return lowerVal + (upperVal-lowerVal)*indexDiff, nil
}
162 changes: 162 additions & 0 deletions chanevents/quantile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package chanevents

import (
"testing"

"github.com/stretchr/testify/require"
)

// TestQuantile pins the interpolation contract and the error paths Quantile
// surfaces to callers.
func TestQuantile(t *testing.T) {
t.Parallel()

tests := []struct {
name string
q float64
xs []float64
want float64
expectErr bool
}{
{
name: "empty slice",
xs: []float64{},
expectErr: true,
},
{
name: "single value",
xs: []float64{
1,
},
want: 1.0,
},
{
name: "single value median",
xs: []float64{
1,
},
q: 0.5,
want: 1.0,
},
{
name: "quantile out of bound below",
xs: []float64{},
q: -0.1,
expectErr: true,
},
{
name: "quantile out of bound above",
xs: []float64{},
q: 1.1,
expectErr: true,
},
{
name: "median odd values",
q: 0.5,
xs: []float64{
1,
2,
3,
4,
5,
},
want: 3.0,
},
{
name: "median even values",
q: 0.5,
xs: []float64{
1,
2,
3,
4,
},
want: 2.5,
},
{
name: "median unsorted",
q: 0.5,
xs: []float64{
1,
3,
2,
4,
},
want: 2.5,
},
{
name: "0 percentile",
q: 0,
xs: []float64{
1,
2,
3,
4,
5,
},
want: 1.0,
},
{
name: "25 percentile",
q: 0.25,
xs: []float64{
1,
2,
3,
4,
5,
},
want: 2.0,
},
{
name: "75 percentile",
q: 0.75,
xs: []float64{
1,
2,
3,
4,
5,
},
want: 4.0,
},
{
name: "0.875 percentile",
q: 0.875,
xs: []float64{
1,
2,
3,
4,
5,
},
want: 4.5,
},
{
name: "100 percentile",
q: 1.0,
xs: []float64{
1,
2,
3,
4,
5,
},
want: 5.0,
},
}

for _, tc := range tests {
t.Run(tc.name, func(tt *testing.T) {
tt.Parallel()

got, err := Quantile(tc.xs, tc.q)
if tc.expectErr {
require.Error(tt, err)
return
}

require.InDelta(tt, tc.want, got, 1e-6)
})
}
}
80 changes: 80 additions & 0 deletions chanevents/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ type Queries interface {

GetChannelEvents(ctx context.Context,
arg sqlc.GetChannelEventsParams) ([]sqlc.ChannelEvent, error)

GetLatestChannelEventBefore(ctx context.Context,
arg sqlc.GetLatestChannelEventBeforeParams) (
sqlc.ChannelEvent,
error,
)

GetChannels(ctx context.Context) ([]sqlc.GetChannelsRow, error)
}

// Store provides access to the db for channel events.
Expand Down Expand Up @@ -184,6 +192,30 @@ func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel,
}, nil
}

// GetChannelByShortChanID retrieves a channel by its short channel ID,
// returning ErrUnknownChannel if no row matches.
func (s *Store) GetChannelByShortChanID(ctx context.Context,
shortChannelID uint64) (*Channel, error) {

dbChannel, err := s.db.GetChannelByShortChanID(
ctx, scidToInt64(shortChannelID),
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrUnknownChannel
}

return nil, err
}

return &Channel{
ID: dbChannel.ID,
ChannelPoint: dbChannel.ChannelPoint,
ShortChannelID: int64ToSCID(dbChannel.ShortChannelID),
PeerID: dbChannel.PeerID,
}, nil
}

// AddChannelEvent adds a new channel event.
func (s *Store) AddChannelEvent(ctx context.Context,
event *ChannelEvent) error {
Expand Down Expand Up @@ -257,6 +289,54 @@ func (s *Store) GetChannelEvents(ctx context.Context, channelID, afterID int64,
return events, nil
}

// ScidToPeerMap returns the historic scid→peer index, including channels that
// have since closed. Unconfirmed channels (scid still zero) are not part of
// the contract.
func (s *Store) ScidToPeerMap(ctx context.Context) (map[uint64]string, error) {
dbChannels, err := s.db.GetChannels(ctx)
if err != nil {
return nil, err
}

scidToPeer := make(map[uint64]string, len(dbChannels))
for _, dbChannel := range dbChannels {
// The short channel ID can be zero if it's not known yet. We
// should just ignore those.
if dbChannel.ShortChannelID == 0 {
continue
}

scidToPeer[uint64(dbChannel.ShortChannelID)] = dbChannel.Pubkey
Comment thread
bitromortac marked this conversation as resolved.
}

return scidToPeer, nil
}

// GetLatestChannelUpdateBefore returns the latest channel event before a given
// time (exclusive). If no event is found, it returns (nil, nil).
func (s *Store) GetLatestChannelUpdateBefore(ctx context.Context,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could potentially make it optional to specify which event type this function uses (and then rename the function)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I skipped this as there's only a single caller right now.

channelID int64, before time.Time) (*ChannelEvent, error) {

dbEvent, err := s.db.GetLatestChannelEventBefore(
ctx, sqlc.GetLatestChannelEventBeforeParams{
ChannelID: channelID,
Timestamp: before.UTC(),
EventType: int16(EventTypeUpdate),
},
)
if err != nil {
// If there are no events before the start time, we return (nil,
// nil).
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}

return nil, err
}

return marshalChannelEvent(dbEvent), nil
}

// marshalChannelEvent converts a db channel event into our internal type.
func marshalChannelEvent(dbEvent sqlc.ChannelEvent) *ChannelEvent {
var localBalance fn.Option[btcutil.Amount]
Expand Down
38 changes: 38 additions & 0 deletions chanevents/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,34 @@ func TestStore(t *testing.T) {
require.Equal(t, testShortChanID1, dbChannel.ShortChannelID)
require.Equal(t, peerID, dbChannel.PeerID)

// Look up the same channel by its scid; the analyzer relies on this
// inverse of AddChannel.
dbChannel, err = store.GetChannelByShortChanID(ctx, testShortChanID1)
require.NoError(t, err)
require.Equal(t, channelID, dbChannel.ID)
require.Equal(t, testChanPoint1, dbChannel.ChannelPoint)
require.Equal(t, testShortChanID1, dbChannel.ShortChannelID)
require.Equal(t, peerID, dbChannel.PeerID)

// An unknown scid surfaces the typed sentinel, not raw sql.ErrNoRows.
dbChannel, err = store.GetChannelByShortChanID(ctx, 9999)
require.ErrorIs(t, err, ErrUnknownChannel)
require.Nil(t, dbChannel)

// Add a second channel for the same peer.
channel2ID, err := store.AddChannel(
ctx, testChanPoint2, testShortChanID2, peerID,
)
require.NoError(t, err)
require.NotZero(t, channel2ID)

// Get the historic channel to peer map.
chanToPeer, err := store.ScidToPeerMap(ctx)
require.NoError(t, err)
require.Len(t, chanToPeer, 2)
require.Equal(t, testPubKey, chanToPeer[testShortChanID1])
require.Equal(t, testPubKey, chanToPeer[testShortChanID2])

// Add an online event for the channel.
onlineEvent := &ChannelEvent{
ChannelID: channelID,
Expand Down Expand Up @@ -134,6 +155,23 @@ func TestStore(t *testing.T) {
requireEqualEvent(
t, updateEvent, testTime.Add(time.Second), events[1],
)
updateEvent = events[1]

// If we query a time after the update event, we'll obtain the update
// event as the latest event.
initEvent, err := store.GetLatestChannelUpdateBefore(
ctx, channelID, updateEvent.Timestamp.Add(500*time.Millisecond),
)
require.NoError(t, err)
requireEqualEvent(t, updateEvent, testTime.Add(time.Second), initEvent)

// If we query at the update event's timestamp, the only event before
// that is left is the online event, which is not an update.
initEvent, err = store.GetLatestChannelUpdateBefore(
ctx, channelID, updateEvent.Timestamp,
)
require.NoError(t, err)
require.Nil(t, initEvent)

// Advance the clock and add a sync event to verify the IsSync flag
// round-trips correctly.
Expand Down
Loading
Loading