Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chanevents/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (m *Monitor) addChannel(ctx context.Context, pubKeyBytes route.Vertex,

// Check if the channel already exists.
channel, err := m.store.GetChannel(ctx, channelPoint)
if err != nil && !errors.Is(err, errUnknownChannel) {
if err != nil && !errors.Is(err, ErrUnknownChannel) {
return fmt.Errorf("error getting channel %s: %w",
channelPoint, err)
}
Expand Down
26 changes: 17 additions & 9 deletions chanevents/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
)

var (
errUnknownPeer = errors.New("unknown peer")
errUnknownChannel = errors.New("unknown channel")
errUnknownPeer = errors.New("unknown peer")

// ErrUnknownChannel signals that the requested channel is not
// present in the store.
ErrUnknownChannel = errors.New("unknown channel")
)

// Queries is a subset of the sqlc.Queries interface that can be used to
Expand Down Expand Up @@ -167,7 +170,7 @@ func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel,
dbChannel, err := s.db.GetChannelByChanPoint(ctx, channelPoint)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errUnknownChannel
return nil, ErrUnknownChannel
}

return nil, fmt.Errorf("failed to get channel: %w", err)
Expand All @@ -185,6 +188,8 @@ func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel,
func (s *Store) AddChannelEvent(ctx context.Context,
event *ChannelEvent) error {

log.Tracef("Adding channel event: %+v", event)
Comment thread
ViktorT-11 marked this conversation as resolved.

var localBalance sql.NullInt64
event.LocalBalance.WhenSome(
func(b btcutil.Amount) {
Expand Down Expand Up @@ -223,18 +228,21 @@ func (s *Store) AddChannelEvent(ctx context.Context,
return nil
}

// GetChannelEvents retrieves all events for a channel within a given time
// range.
// TODO: Add pagination support (LIMIT/OFFSET) to prevent OOM on high-traffic
// channels.
func (s *Store) GetChannelEvents(ctx context.Context, channelID int64,
startTime, endTime time.Time) ([]*ChannelEvent, error) {
// GetChannelEvents returns up to limit events for a channel where
// id > afterID AND startTime <= timestamp < endTime, ordered by id ASC.
// Pass afterID = 0 on the first call; for subsequent calls pass the
// previous page's last event id. The (startTime, endTime) bounds are
// independent filters and do not need to advance between pages.
func (s *Store) GetChannelEvents(ctx context.Context, channelID, afterID int64,
startTime, endTime time.Time, limit int32) ([]*ChannelEvent, error) {

dbEvents, err := s.db.GetChannelEvents(
ctx, sqlc.GetChannelEventsParams{
ChannelID: channelID,
ID: afterID,
Timestamp: startTime.UTC(),
Timestamp_2: endTime.UTC(),
Limit: limit,
},
)
if err != nil {
Expand Down
53 changes: 50 additions & 3 deletions chanevents/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestStore(t *testing.T) {

// Get a non-existent channel and assert an error is returned.
dbChannel, err := store.GetChannel(ctx, "non-existent-chan-point")
require.ErrorIs(t, err, errUnknownChannel)
require.ErrorIs(t, err, ErrUnknownChannel)
require.Nil(t, dbChannel)

// Get the channel and assert it is the same.
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestStore(t *testing.T) {

// Get the channel events and assert they are correct.
events, err := store.GetChannelEvents(
ctx, channelID, time.Unix(0, 0), time.Unix(3, 0),
ctx, channelID, 0, time.Unix(0, 0), time.Unix(3, 0), 100,
)
require.NoError(t, err)
require.Len(t, events, 2)
Expand All @@ -148,7 +148,7 @@ func TestStore(t *testing.T) {
require.NoError(t, err)

events, err = store.GetChannelEvents(
ctx, channelID, time.Unix(0, 0), time.Unix(4, 0),
ctx, channelID, 0, time.Unix(0, 0), time.Unix(4, 0), 100,
)
require.NoError(t, err)
require.Len(t, events, 3)
Expand All @@ -157,3 +157,50 @@ func TestStore(t *testing.T) {
t, syncEvent, testTime.Add(2*time.Second), events[2],
)
}

// TestPagination verifies that the keyset cursor advances correctly across
// events sharing one second-resolution timestamp.
func TestPagination(t *testing.T) {
t.Parallel()

clock := clock.NewTestClock(testTime)
store := NewTestDB(t, clock)
ctx := context.Background()

peerID, err := store.AddPeer(ctx, testPubKey)
require.NoError(t, err)

channelID, err := store.AddChannel(
ctx, testChanPoint1, testShortChanID1, peerID,
)
require.NoError(t, err)

sameTime := testTime.Add(10 * time.Second)
for i := 0; i < 5; i++ {
err = store.AddChannelEvent(ctx, &ChannelEvent{
ChannelID: channelID,
EventType: EventTypeUpdate,
Timestamp: sameTime,
LocalBalance: fn.Some(btcutil.Amount(i)),
})
require.NoError(t, err)
}

endTime := sameTime.Add(time.Hour)

page1, err := store.GetChannelEvents(
ctx, channelID, 0, time.Unix(0, 0), endTime, 3,
)
require.NoError(t, err)
require.Len(t, page1, 3)

page2, err := store.GetChannelEvents(
ctx, channelID, page1[len(page1)-1].ID,
time.Unix(0, 0), endTime, 3,
)
require.NoError(t, err)
require.Len(t, page2, 2)

require.Equal(t, btcutil.Amount(3), page2[0].LocalBalance.UnwrapOr(0))
require.Equal(t, btcutil.Amount(4), page2[1].LocalBalance.UnwrapOr(0))
}
97 changes: 97 additions & 0 deletions cmd/frcli/chan_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"context"
"fmt"

"github.com/lightninglabs/faraday/frdrpc"
"github.com/urfave/cli"
)

var chanEventsCommand = cli.Command{
Name: "chanevents",
Category: "reporting",
Usage: "Get a report of channel events.",
Description: `
Get a report for a channel which provides a detailed account of its
lifecycle events. The server caps each response; if has_more is true,
fetch the next page by re-running with --last_id set to the previous
response's last_id. Stop when has_more is false. --start_time and
--end_time are independent filters and do not need to advance between
paginated calls.`,
ArgsUsage: "funding_txid [output_index]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "funding_txid",
Usage: "the txid of the channel's funding transaction",
},
cli.IntFlag{
Name: "output_index",
Usage: "the output index for the funding output of " +
"the funding transaction",
},
cli.Int64Flag{
Name: "start_time",
Usage: "start time of the query range as a unix timestamp",
},
cli.Int64Flag{
Name: "end_time",
Usage: "end time of the query range as a unix " +
"timestamp; zero defaults to the server's " +
"current time",
},
cli.UintFlag{
Name: "max_events",
Usage: "maximum number of events to return; zero " +
"uses the server default (capped server-side)",
},
cli.Int64Flag{
Name: "last_id",
Usage: "pagination cursor; pass the previous " +
"response's last_id to continue, or zero " +
"for the first page",
},
},
Action: queryChanEvents,
}

func queryChanEvents(ctx *cli.Context) error {
client, cleanup := getClient(ctx)
defer cleanup()

// Show command help if the channel point was not provided.
if ctx.NArg() == 0 && ctx.String("funding_txid") == "" {
return cli.ShowCommandHelp(ctx, "chanevents")
}

outpoint, err := parseChannelPoint(ctx)
if err != nil {
return err
}

startTime := ctx.Int64("start_time")
endTime := ctx.Int64("end_time")
if startTime < 0 || endTime < 0 {
return fmt.Errorf("start_time and end_time must be >= 0")
}
if endTime != 0 && startTime > endTime {
return fmt.Errorf("start_time must be <= end_time")
}

req := &frdrpc.ChannelEventsRequest{
ChanPoint: outpoint.String(),
StartTime: startTime,
EndTime: endTime,
MaxEvents: uint32(ctx.Uint("max_events")),
LastId: ctx.Int64("last_id"),
}

rpcCtx := context.Background()
report, err := client.GetChannelEvents(rpcCtx, req)
if err != nil {
return err
}

printRespJSON(report)
return nil
}
1 change: 1 addition & 0 deletions cmd/frcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func main() {
fiatEstimateCommand,
onChainReportCommand,
closeReportCommand,
chanEventsCommand,
}

if err := app.Run(os.Args); err != nil {
Expand Down
18 changes: 15 additions & 3 deletions db/sqlc/chanevents.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions db/sqlc/migrations/000001_chanevents.down.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP INDEX IF EXISTS channel_events_chan_id_id_idx;
DROP INDEX IF EXISTS channel_events_chan_id_ts_idx;
DROP TABLE IF EXISTS channel_events;
DROP INDEX IF EXISTS channel_peer_idx;
Expand Down
13 changes: 9 additions & 4 deletions db/sqlc/migrations/000001_chanevents.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ CREATE TABLE IF NOT EXISTS channel_events (
is_sync BOOLEAN NOT NULL DEFAULT FALSE
);

-- This composite index is crucial for efficiently querying the event history
-- of a specific channel. It allows the database to quickly locate relevant rows
-- for a given channel, sorted by time. This is useful for fetching events
-- within a time range, and for finding the latest event before a certain time.
-- This composite index supports the chronological access patterns
-- (GetChannelEventsIter, GetLatestChannelEventBefore): events for a given
-- channel sorted by time, with a per-channel time-range scan.
CREATE INDEX IF NOT EXISTS channel_events_chan_id_ts_idx ON channel_events (channel_id, timestamp);

-- This composite index supports the public GetChannelEvents query, which
-- walks events for a given channel by id-keyset cursor (ORDER BY id ASC,
-- WHERE id > $cursor). Without it, the planner would scan every event with
-- id > $cursor across all channels and filter by channel_id afterwards.
CREATE INDEX IF NOT EXISTS channel_events_chan_id_id_idx ON channel_events (channel_id, id);
8 changes: 6 additions & 2 deletions db/sqlc/queries/chanevents.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ INSERT INTO channel_events (

-- name: GetChannelEvents :many
SELECT * FROM channel_events
WHERE channel_id = $1 AND timestamp >= $2 AND timestamp < $3
ORDER BY timestamp ASC, id ASC;
WHERE channel_id = $1
AND id > $2
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this PR looks great, so I think this is my main comment for the PR:
Is it for performance reasons or that you think it's a better UI that you use the lastId approach over using an offset (i.e. something like LIMIT $5 OFFSET $6)? The approach choosen effects the rest of the PR.

I think the offset approach is much more common, and is what we use in litd for actions and I think potentially more natural for a user at it leaks no internals of how the ChannelEvent is stored (i.e. the id), and is also independent of the store used (i.e. not requiring that an incremental id is used).

Would therefore be interesting to hear your motivation behind this approach, which I do see some pros with as well!

Copy link
Copy Markdown
Contributor Author

@bitromortac bitromortac May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Previously I had time-based pagination, but that didn't work well because of timestamp precision. I didn't really consider offset pagination, but a good point that this would be more consistent with other projects and doesn't expose internal state. However, I see that in the future we may want to delete older events, because I expect a constant stream of updates. In a new terminal web feature I think we also want to sync balances. With deletion I guess that it's easier to maintain the sync state by using the id. Let me know if you prefer the offset way, though I think the id approach has also speed advantages.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, ultimately I'll let you decide on which approach you'd want to use!

Personally I have no strong preference. I do think the offset approach is usually standard practise though, even when deletion is possible. I think the main drawback with the current approach is that it'll bind the store to always use an incremental id for the rows in the db.

But ultimately I think there are pros with the current approach as well, so I'll let you decide :)

Copy link
Copy Markdown
Contributor Author

@bitromortac bitromortac May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I thought about it a bit more, I'd accept the small implementation leak for the performance and for incremental sync that survives head deletions (which I think isn't possible in a straightforward manner with an offset unless one introduces an additional absolute sync watermark value like a timestamp). Thanks for the input here!

AND timestamp >= $3
AND timestamp < $4
ORDER BY id ASC
LIMIT $5;
2 changes: 2 additions & 0 deletions faraday.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (f *Faraday) Start() error {

cfg := &frdrpcserver.Config{
Lnd: f.lnd.LndServices,
ChanEvents: f.stores.ChanEventsStore,
BitcoinClient: f.bitcoinClient,
}

Expand Down Expand Up @@ -401,6 +402,7 @@ func (f *Faraday) StartAsSubserver(lndGrpc *lndclient.GrpcLndServices,

cfg := &frdrpcserver.Config{
Lnd: lndGrpc.LndServices,
ChanEvents: f.stores.ChanEventsStore,
BitcoinClient: f.bitcoinClient,
}

Expand Down
Loading
Loading