From 3157dbc870ee0df648d4ee7dda5cdb740ed4e385 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Wed, 24 Sep 2025 10:53:42 +0200 Subject: [PATCH 1/3] db: add chanevents tables and queries --- db/migrations.go | 2 +- db/migrations_test.go | 41 +++++ db/schemas.go | 1 + db/sqlc/chanevents.sql.go | 150 ++++++++++++++++++ db/sqlc/db_custom.go | 34 ++++ db/sqlc/migrations/000001_chanevents.down.sql | 5 + db/sqlc/migrations/000001_chanevents.up.sql | 45 ++++++ db/sqlc/models.go | 31 ++++ db/sqlc/querier.go | 21 +++ db/sqlc/queries/chanevents.sql | 24 +++ 10 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 db/migrations_test.go create mode 100644 db/sqlc/chanevents.sql.go create mode 100644 db/sqlc/db_custom.go create mode 100644 db/sqlc/migrations/000001_chanevents.down.sql create mode 100644 db/sqlc/migrations/000001_chanevents.up.sql create mode 100644 db/sqlc/models.go create mode 100644 db/sqlc/querier.go create mode 100644 db/sqlc/queries/chanevents.sql diff --git a/db/migrations.go b/db/migrations.go index b98fc6d..bd882c0 100644 --- a/db/migrations.go +++ b/db/migrations.go @@ -6,5 +6,5 @@ const ( // daemon. // // NOTE: This MUST be updated when a new migration is added. - LatestMigrationVersion = 0 + LatestMigrationVersion = 1 ) diff --git a/db/migrations_test.go b/db/migrations_test.go new file mode 100644 index 0000000..d87826e --- /dev/null +++ b/db/migrations_test.go @@ -0,0 +1,41 @@ +package db + +import ( + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestLatestMigrationVersion ensures that LatestMigrationVersion stays in sync +// with the highest-numbered .up.sql file in the migrations directory. Each +// migration — whether pure SQL or programmatic (with a dummy SQL file) — gets +// its own numbered file pair, so the max file number must equal the constant. +func TestLatestMigrationVersion(t *testing.T) { + entries, err := sqlSchemas.ReadDir("sqlc/migrations") + require.NoError(t, err) + + var maxVersion uint + for _, entry := range entries { + if !strings.HasSuffix(entry.Name(), ".up.sql") { + continue + } + + parts := strings.SplitN(entry.Name(), "_", 2) + require.NotEmpty(t, parts) + + v, err := strconv.ParseUint(parts[0], 10, 64) + require.NoError(t, err) + + if uint(v) > maxVersion { + maxVersion = uint(v) + } + } + + require.EqualValues( + t, maxVersion, LatestMigrationVersion, + "LatestMigrationVersion is out of date, update "+ + "db/migrations.go", + ) +} diff --git a/db/schemas.go b/db/schemas.go index d65258b..1a7a209 100644 --- a/db/schemas.go +++ b/db/schemas.go @@ -5,4 +5,5 @@ import ( _ "embed" ) +//go:embed sqlc/migrations/*.*.sql var sqlSchemas embed.FS diff --git a/db/sqlc/chanevents.sql.go b/db/sqlc/chanevents.sql.go new file mode 100644 index 0000000..9720995 --- /dev/null +++ b/db/sqlc/chanevents.sql.go @@ -0,0 +1,150 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 +// source: chanevents.sql + +package sqlc + +import ( + "context" + "database/sql" + "time" +) + +const getChannelByChanPoint = `-- name: GetChannelByChanPoint :one +SELECT id, channel_point, short_channel_id, peer_id FROM channels WHERE channel_point = $1 +` + +func (q *Queries) GetChannelByChanPoint(ctx context.Context, channelPoint string) (Channel, error) { + row := q.db.QueryRowContext(ctx, getChannelByChanPoint, channelPoint) + var i Channel + err := row.Scan( + &i.ID, + &i.ChannelPoint, + &i.ShortChannelID, + &i.PeerID, + ) + return i, err +} + +const getChannelByShortChanID = `-- name: GetChannelByShortChanID :one +SELECT id, channel_point, short_channel_id, peer_id FROM channels WHERE short_channel_id = $1 +` + +func (q *Queries) GetChannelByShortChanID(ctx context.Context, shortChannelID int64) (Channel, error) { + row := q.db.QueryRowContext(ctx, getChannelByShortChanID, shortChannelID) + var i Channel + err := row.Scan( + &i.ID, + &i.ChannelPoint, + &i.ShortChannelID, + &i.PeerID, + ) + return i, err +} + +const getChannelEvents = `-- name: GetChannelEvents :many +SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat FROM channel_events +WHERE channel_id = $1 AND timestamp >= $2 AND timestamp < $3 +ORDER BY timestamp ASC, id ASC +` + +type GetChannelEventsParams struct { + ChannelID int64 + Timestamp time.Time + Timestamp_2 time.Time +} + +func (q *Queries) GetChannelEvents(ctx context.Context, arg GetChannelEventsParams) ([]ChannelEvent, error) { + rows, err := q.db.QueryContext(ctx, getChannelEvents, arg.ChannelID, arg.Timestamp, arg.Timestamp_2) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ChannelEvent + for rows.Next() { + var i ChannelEvent + if err := rows.Scan( + &i.ID, + &i.ChannelID, + &i.EventType, + &i.Timestamp, + &i.LocalBalanceSat, + &i.RemoteBalanceSat, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getPeerByPubKey = `-- name: GetPeerByPubKey :one +SELECT id, pubkey FROM peers WHERE pubkey = $1 +` + +func (q *Queries) GetPeerByPubKey(ctx context.Context, pubkey string) (Peer, error) { + row := q.db.QueryRowContext(ctx, getPeerByPubKey, pubkey) + var i Peer + err := row.Scan(&i.ID, &i.Pubkey) + return i, err +} + +const insertChannel = `-- name: InsertChannel :one +INSERT INTO channels (channel_point, short_channel_id, peer_id) VALUES ($1, $2, $3) RETURNING id +` + +type InsertChannelParams struct { + ChannelPoint string + ShortChannelID int64 + PeerID int64 +} + +func (q *Queries) InsertChannel(ctx context.Context, arg InsertChannelParams) (int64, error) { + row := q.db.QueryRowContext(ctx, insertChannel, arg.ChannelPoint, arg.ShortChannelID, arg.PeerID) + var id int64 + err := row.Scan(&id) + return id, err +} + +const insertChannelEvent = `-- name: InsertChannelEvent :exec +INSERT INTO channel_events ( + channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat +) VALUES ($1, $2, $3, $4, $5) +` + +type InsertChannelEventParams struct { + ChannelID int64 + EventType int16 + Timestamp time.Time + LocalBalanceSat sql.NullInt64 + RemoteBalanceSat sql.NullInt64 +} + +func (q *Queries) InsertChannelEvent(ctx context.Context, arg InsertChannelEventParams) error { + _, err := q.db.ExecContext(ctx, insertChannelEvent, + arg.ChannelID, + arg.EventType, + arg.Timestamp, + arg.LocalBalanceSat, + arg.RemoteBalanceSat, + ) + return err +} + +const insertPeer = `-- name: InsertPeer :one +INSERT INTO peers (pubkey) VALUES ($1) RETURNING id +` + +func (q *Queries) InsertPeer(ctx context.Context, pubkey string) (int64, error) { + row := q.db.QueryRowContext(ctx, insertPeer, pubkey) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/db/sqlc/db_custom.go b/db/sqlc/db_custom.go new file mode 100644 index 0000000..00b8afb --- /dev/null +++ b/db/sqlc/db_custom.go @@ -0,0 +1,34 @@ +// Package sqlc provides a set of custom database queries and utilities +// for interacting with the SQL database used in the application. It includes +// generated code from sqlc as well as custom wrappers to handle different +// database backends. +package sqlc + +import ( + "github.com/lightningnetwork/lnd/sqldb/v2" +) + +// wrappedTX is a wrapper around a DBTX that also stores the database backend +// type. +type wrappedTX struct { + DBTX + + backendType sqldb.BackendType +} + +// Backend returns the type of database backend we're using. +func (q *Queries) Backend() sqldb.BackendType { + wtx, ok := q.db.(*wrappedTX) + if !ok { + // Shouldn't happen unless a new database backend type is added + // but not initialized correctly. + return sqldb.BackendTypeUnknown + } + + return wtx.backendType +} + +// NewForType creates a new Queries instance for the given database type. +func NewForType(db DBTX, typ sqldb.BackendType) *Queries { + return &Queries{db: &wrappedTX{db, typ}} +} diff --git a/db/sqlc/migrations/000001_chanevents.down.sql b/db/sqlc/migrations/000001_chanevents.down.sql new file mode 100644 index 0000000..df2c627 --- /dev/null +++ b/db/sqlc/migrations/000001_chanevents.down.sql @@ -0,0 +1,5 @@ +DROP INDEX IF EXISTS channel_events_chan_id_ts_idx; +DROP TABLE IF EXISTS channel_events; +DROP INDEX IF EXISTS channel_peer_idx; +DROP TABLE IF EXISTS channels; +DROP TABLE IF EXISTS peers; diff --git a/db/sqlc/migrations/000001_chanevents.up.sql b/db/sqlc/migrations/000001_chanevents.up.sql new file mode 100644 index 0000000..f733b59 --- /dev/null +++ b/db/sqlc/migrations/000001_chanevents.up.sql @@ -0,0 +1,45 @@ +-- The peers table stores all the peers that we have channels with. +CREATE TABLE IF NOT EXISTS peers ( + -- The auto incrementing primary key. + id INTEGER PRIMARY KEY, + -- The public key of the peer. + pubkey TEXT NOT NULL UNIQUE +); + +-- The channels table stores all the channels that we have with our peers. +CREATE TABLE IF NOT EXISTS channels ( + -- The auto incrementing primary key. + id INTEGER PRIMARY KEY, + -- The channel point, as a 'txid:output_index' string. + channel_point TEXT NOT NULL UNIQUE, + -- The short channel ID. + short_channel_id BIGINT NOT NULL UNIQUE, + -- The peer that this channel is with. + peer_id BIGINT NOT NULL REFERENCES peers(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS channel_peer_idx ON channels (peer_id); + +-- The channel_events table stores all the events that are associated with a +-- particular channel. +CREATE TABLE IF NOT EXISTS channel_events ( + -- The auto incrementing primary key. + id INTEGER PRIMARY KEY, + -- The channel that this event is associated with. + channel_id BIGINT NOT NULL REFERENCES channels(id) ON DELETE CASCADE, + -- The type of event. + event_type SMALLINT NOT NULL, + -- The time the event occurred. + timestamp TIMESTAMP NOT NULL, + -- The local balance of the channel at the time of the event. + -- This is only populated for balance update events. + local_balance_sat BIGINT CHECK (local_balance_sat >= 0), + -- The remote balance of the channel at the time of the event. + -- This is only populated for balance update events. + remote_balance_sat BIGINT CHECK (remote_balance_sat >= 0) +); + +-- This composite index is crucial for efficiently querying the event history +-- of a specific channel. It allows the database to quickly locate relevant rows +-- for a given channel, sorted by time. This is useful for fetching events +-- within a time range, and for finding the latest event before a certain time. +CREATE INDEX IF NOT EXISTS channel_events_chan_id_ts_idx ON channel_events (channel_id, timestamp); diff --git a/db/sqlc/models.go b/db/sqlc/models.go new file mode 100644 index 0000000..1094533 --- /dev/null +++ b/db/sqlc/models.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 + +package sqlc + +import ( + "database/sql" + "time" +) + +type Channel struct { + ID int64 + ChannelPoint string + ShortChannelID int64 + PeerID int64 +} + +type ChannelEvent struct { + ID int64 + ChannelID int64 + EventType int16 + Timestamp time.Time + LocalBalanceSat sql.NullInt64 + RemoteBalanceSat sql.NullInt64 +} + +type Peer struct { + ID int64 + Pubkey string +} diff --git a/db/sqlc/querier.go b/db/sqlc/querier.go new file mode 100644 index 0000000..f56f962 --- /dev/null +++ b/db/sqlc/querier.go @@ -0,0 +1,21 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 + +package sqlc + +import ( + "context" +) + +type Querier interface { + GetChannelByChanPoint(ctx context.Context, channelPoint string) (Channel, error) + GetChannelByShortChanID(ctx context.Context, shortChannelID int64) (Channel, error) + GetChannelEvents(ctx context.Context, arg GetChannelEventsParams) ([]ChannelEvent, error) + GetPeerByPubKey(ctx context.Context, pubkey string) (Peer, error) + InsertChannel(ctx context.Context, arg InsertChannelParams) (int64, error) + InsertChannelEvent(ctx context.Context, arg InsertChannelEventParams) error + InsertPeer(ctx context.Context, pubkey string) (int64, error) +} + +var _ Querier = (*Queries)(nil) diff --git a/db/sqlc/queries/chanevents.sql b/db/sqlc/queries/chanevents.sql new file mode 100644 index 0000000..5370e4e --- /dev/null +++ b/db/sqlc/queries/chanevents.sql @@ -0,0 +1,24 @@ +-- name: InsertPeer :one +INSERT INTO peers (pubkey) VALUES ($1) RETURNING id; + +-- name: GetPeerByPubKey :one +SELECT * FROM peers WHERE pubkey = $1; + +-- name: InsertChannel :one +INSERT INTO channels (channel_point, short_channel_id, peer_id) VALUES ($1, $2, $3) RETURNING id; + +-- name: GetChannelByChanPoint :one +SELECT * FROM channels WHERE channel_point = $1; + +-- name: GetChannelByShortChanID :one +SELECT * FROM channels WHERE short_channel_id = $1; + +-- name: InsertChannelEvent :exec +INSERT INTO channel_events ( + channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat +) VALUES ($1, $2, $3, $4, $5); + +-- name: GetChannelEvents :many +SELECT * FROM channel_events +WHERE channel_id = $1 AND timestamp >= $2 AND timestamp < $3 +ORDER BY timestamp ASC, id ASC; From fcb52aa2b4f6dd61cc522bfb43bb96695d1798fc Mon Sep 17 00:00:00 2001 From: bitromortac Date: Fri, 27 Mar 2026 12:26:03 +0100 Subject: [PATCH 2/3] github: sqlc check --- .github/workflows/main.yml | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7d367d4..2927e36 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -50,10 +50,28 @@ jobs: - name: run imports check run: make fmt - + - name: run JS stubs check run: make rpc-js-compile + ####################### + # sql model generation + ####################### + sqlc-check: + name: Sqlc check + runs-on: ubuntu-latest + steps: + - name: git checkout + uses: actions/checkout@v2 + + - name: setup go ${{ env.GO_VERSION }} + uses: actions/setup-go@v2 + with: + go-version: '${{ env.GO_VERSION }}' + + - name: Generate sql models + run: make sqlc-check + ######################## # lint code ######################## From d2df787da6a404a54b60d83de46c86ca5ed639e8 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Wed, 1 Apr 2026 15:00:34 +0200 Subject: [PATCH 3/3] chanevents: implement store --- chanevents/chanevents.go | 109 ++++++++++++++ chanevents/store.go | 273 ++++++++++++++++++++++++++++++++++++ chanevents/store_test.go | 128 +++++++++++++++++ chanevents/test_postgres.go | 29 ++++ chanevents/test_sql.go | 18 +++ chanevents/test_sqlite.go | 30 ++++ go.mod | 4 +- 7 files changed, 589 insertions(+), 2 deletions(-) create mode 100644 chanevents/chanevents.go create mode 100644 chanevents/store.go create mode 100644 chanevents/store_test.go create mode 100644 chanevents/test_postgres.go create mode 100644 chanevents/test_sql.go create mode 100644 chanevents/test_sqlite.go diff --git a/chanevents/chanevents.go b/chanevents/chanevents.go new file mode 100644 index 0000000..70ca21f --- /dev/null +++ b/chanevents/chanevents.go @@ -0,0 +1,109 @@ +// Package chanevents contains functions for monitoring and storing channel +// events such as online/offline and balance updates. +package chanevents + +import ( + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/fn/v2" +) + +// EventType is an enum for the different types of channel events. +type EventType int16 + +const ( + // EventTypeUnknown is the unknown event type. + EventTypeUnknown = 0 + + // EventTypeOnline is the online event type. + EventTypeOnline = 1 + + // EventTypeOffline is the offline event type. + EventTypeOffline = 2 + + // EventTypeUpdate is the balance update event type. + EventTypeUpdate = 3 +) + +// String returns the string representation of the event type. +func (e EventType) String() string { + switch e { + case EventTypeOnline: + return "online" + + case EventTypeOffline: + return "offline" + + case EventTypeUpdate: + return "update" + + default: + return "unknown" + } +} + +// EventTypeFromString returns the event type from a string. +func EventTypeFromString(s string) EventType { + switch s { + case "online": + return EventTypeOnline + + case "offline": + return EventTypeOffline + + case "update": + return EventTypeUpdate + + default: + return EventTypeUnknown + } +} + +// Peer is the application-level representation of a peer. +type Peer struct { + // ID is the database ID of the peer. + ID int64 + + // PubKey is the public key of the peer. + PubKey string +} + +// Channel is the application-level representation of a channel. +type Channel struct { + // ID is the database ID of the channel. + ID int64 + + // ChannelPoint is the channel point of the channel. + ChannelPoint string + + // ShortChannelID is the short channel ID of the channel. + ShortChannelID uint64 + + // PeerID is the database ID of the peer that this channel is with. + PeerID int64 +} + +// ChannelEvent is the application-level representation of a channel event. +type ChannelEvent struct { + // ID is the database ID of the event. + ID int64 + + // ChannelID is the database ID of the channel that this event is + // associated with. + ChannelID int64 + + // EventType is the type of the event. + EventType EventType + + // Timestamp is the time that the event occurred. + Timestamp time.Time + + // LocalBalance is the local balance of the channel at the time of the + // event. This is only populated for balance update events. + LocalBalance fn.Option[btcutil.Amount] + + // RemoteBalance is the remote balance of the channel at the time of the + // event. This is only populated for balance update events. + RemoteBalance fn.Option[btcutil.Amount] +} diff --git a/chanevents/store.go b/chanevents/store.go new file mode 100644 index 0000000..a3d202d --- /dev/null +++ b/chanevents/store.go @@ -0,0 +1,273 @@ +package chanevents + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/faraday/db/sqlc" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/sqldb/v2" +) + +var ( + errUnknownPeer = errors.New("unknown peer") + errUnknownChannel = errors.New("unknown channel") +) + +// Queries is a subset of the sqlc.Queries interface that can be used to +// interact with the peers, channels and channel_events tables. +type Queries interface { + InsertPeer(ctx context.Context, pubkey string) (int64, error) + + GetPeerByPubKey(ctx context.Context, pubkey string) (sqlc.Peer, error) + + InsertChannel(ctx context.Context, + arg sqlc.InsertChannelParams) (int64, error) + + GetChannelByChanPoint(ctx context.Context, + channelPoint string) (sqlc.Channel, error) + + GetChannelByShortChanID(ctx context.Context, + shortChannelID int64) (sqlc.Channel, error) + + InsertChannelEvent(ctx context.Context, + arg sqlc.InsertChannelEventParams) error + + GetChannelEvents(ctx context.Context, + arg sqlc.GetChannelEventsParams) ([]sqlc.ChannelEvent, error) +} + +// Store provides access to the db for channel events. +type Store struct { + // db is all the higher level queries that the SQLStore has access to in + // order to implement all its CRUD logic. + db BatchedSQLQueries + + // BaseDB represents the underlying database connection. + *sqldb.BaseDB + + clock clock.Clock +} + +// BatchedSQLQueries combines the SQLQueries interface with the BatchedTx +// interface, allowing for multiple queries to be executed in single SQL +// transaction. +type BatchedSQLQueries interface { + SQLQueries + + sqldb.BatchedTx[SQLQueries] +} + +// SQLQueries is a subset of the sqlc.Queries interface that can be used to +// interact with various chanevents tables. +type SQLQueries interface { + sqldb.BaseQuerier + + Queries +} + +type SQLQueriesExecutor[T sqldb.BaseQuerier] struct { + *sqldb.TransactionExecutor[T] + + SQLQueries +} + +// NewStore creates a new SQLStore instance given an open SQLQueries storage +// backend. +func NewStore(sqlDB *sqldb.BaseDB, queries *sqlc.Queries, + clock clock.Clock) *Store { + + txExecutor := sqldb.NewTransactionExecutor( + sqlDB, + func(tx *sql.Tx) SQLQueries { + return queries.WithTx(tx) + }, + ) + + executor := &SQLQueriesExecutor[SQLQueries]{ + TransactionExecutor: txExecutor, + SQLQueries: queries, + } + + return &Store{ + db: executor, + BaseDB: sqlDB, + clock: clock, + } +} + +// AddPeer adds a new peer to the database. +func (s *Store) AddPeer(ctx context.Context, pubkey string) (int64, error) { + id, err := s.db.InsertPeer(ctx, pubkey) + if err != nil { + return 0, fmt.Errorf("failed to insert peer: %w", err) + } + + return id, nil +} + +// GetPeer retrieves a peer by their public key. +func (s *Store) GetPeer(ctx context.Context, pubkey string) (*Peer, error) { + dbPeer, err := s.db.GetPeerByPubKey(ctx, pubkey) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, errUnknownPeer + } + + return nil, fmt.Errorf("failed to get peer: %w", err) + } + + return &Peer{ + ID: dbPeer.ID, + PubKey: dbPeer.Pubkey, + }, nil +} + +// int64ToSCID converts an int64 to a uint64 ShortChannelID. The BOLT spec +// encodes SCIDs as uint64, but SQL only supports signed int64. We preserve the +// bits, which means SCIDs with the high bit set will appear negative in the +// database. Direct SQL queries (e.g. ORDER BY short_channel_id) will not sort +// these correctly, but round-tripping through Go preserves the value. +func int64ToSCID(i int64) uint64 { + return uint64(i) +} + +// scidToInt64 converts a uint64 ShortChannelID to an int64 for SQL storage. +func scidToInt64(u uint64) int64 { + return int64(u) +} + +// AddChannel adds a new channel for a peer. +func (s *Store) AddChannel(ctx context.Context, channelPoint string, + shortChannelID uint64, peerID int64) (int64, error) { + + id, err := s.db.InsertChannel( + ctx, sqlc.InsertChannelParams{ + ChannelPoint: channelPoint, + ShortChannelID: scidToInt64(shortChannelID), + PeerID: peerID, + }, + ) + if err != nil { + return 0, fmt.Errorf("failed to insert channel: %w", err) + } + + return id, nil +} + +// GetChannel retrieves a channel by its channel point. +func (s *Store) GetChannel(ctx context.Context, channelPoint string) (*Channel, + error) { + + dbChannel, err := s.db.GetChannelByChanPoint(ctx, channelPoint) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, errUnknownChannel + } + + return nil, fmt.Errorf("failed to get channel: %w", err) + } + + return &Channel{ + ID: dbChannel.ID, + ChannelPoint: dbChannel.ChannelPoint, + ShortChannelID: int64ToSCID(dbChannel.ShortChannelID), + PeerID: dbChannel.PeerID, + }, nil +} + +// AddChannelEvent adds a new channel event. +func (s *Store) AddChannelEvent(ctx context.Context, + event *ChannelEvent) error { + + var localBalance sql.NullInt64 + event.LocalBalance.WhenSome( + func(b btcutil.Amount) { + localBalance.Int64 = int64(b) + localBalance.Valid = true + }, + ) + + var remoteBalance sql.NullInt64 + event.RemoteBalance.WhenSome( + func(b btcutil.Amount) { + remoteBalance.Int64 = int64(b) + remoteBalance.Valid = true + }, + ) + + timestamp := event.Timestamp.UTC() + if timestamp.IsZero() { + timestamp = s.clock.Now().UTC() + } + + err := s.db.InsertChannelEvent( + ctx, sqlc.InsertChannelEventParams{ + ChannelID: event.ChannelID, + EventType: int16(event.EventType), + Timestamp: timestamp, + LocalBalanceSat: localBalance, + RemoteBalanceSat: remoteBalance, + }, + ) + if err != nil { + return fmt.Errorf("failed to insert channel event: %w", err) + } + + return nil +} + +// GetChannelEvents retrieves all events for a channel within a given time +// range. +// TODO: Add pagination support (LIMIT/OFFSET) to prevent OOM on high-traffic +// channels. +func (s *Store) GetChannelEvents(ctx context.Context, channelID int64, + startTime, endTime time.Time) ([]*ChannelEvent, error) { + + dbEvents, err := s.db.GetChannelEvents( + ctx, sqlc.GetChannelEventsParams{ + ChannelID: channelID, + Timestamp: startTime.UTC(), + Timestamp_2: endTime.UTC(), + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to get channel events: %w", err) + } + + events := make([]*ChannelEvent, len(dbEvents)) + for i, dbEvent := range dbEvents { + events[i] = marshalChannelEvent(dbEvent) + } + + return events, nil +} + +// marshalChannelEvent converts a db channel event into our internal type. +func marshalChannelEvent(dbEvent sqlc.ChannelEvent) *ChannelEvent { + var localBalance fn.Option[btcutil.Amount] + if dbEvent.LocalBalanceSat.Valid { + amt := btcutil.Amount(dbEvent.LocalBalanceSat.Int64) + localBalance = fn.Some(amt) + } + + var remoteBalance fn.Option[btcutil.Amount] + if dbEvent.RemoteBalanceSat.Valid { + amt := btcutil.Amount(dbEvent.RemoteBalanceSat.Int64) + remoteBalance = fn.Some(amt) + } + + return &ChannelEvent{ + ID: dbEvent.ID, + ChannelID: dbEvent.ChannelID, + EventType: EventType(dbEvent.EventType), + Timestamp: dbEvent.Timestamp.UTC(), + LocalBalance: localBalance, + RemoteBalance: remoteBalance, + } +} diff --git a/chanevents/store_test.go b/chanevents/store_test.go new file mode 100644 index 0000000..c9bbaf2 --- /dev/null +++ b/chanevents/store_test.go @@ -0,0 +1,128 @@ +package chanevents + +import ( + "context" + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/stretchr/testify/require" +) + +var ( + testPubKey = "028d4c6347426f2e3f5e2b8e4a1c3b9f1" + + "c4e5d6f7a8b9c0d1e2f3a4b5c6d7e8f9" + testChanPoint1 = "test_txid:0" + testChanPoint2 = "test_txid:1" + testShortChanID1 uint64 = 123 + testShortChanID2 uint64 = 456 + + testTime = time.Unix(1, 0) +) + +// TestStore tests the chanevents store. +func TestStore(t *testing.T) { + t.Parallel() + + // First, create a new test database. + clock := clock.NewTestClock(testTime) + store := NewTestDB(t, clock) + ctx := context.Background() + + // *** Peers *** Add a peer. + peer := &Peer{PubKey: testPubKey} + peerID, err := store.AddPeer(ctx, peer.PubKey) + require.NoError(t, err) + require.NotZero(t, peerID) + + // Adding the same peer again violates the unique constraint. + _, err = store.AddPeer(ctx, peer.PubKey) + require.Error(t, err) + + dbPeer, err := store.GetPeer(ctx, "non_existent_pubkey") + require.ErrorIs(t, err, errUnknownPeer) + require.Nil(t, dbPeer) + + // Get the peer and assert it is the same. + dbPeer, err = store.GetPeer(ctx, peer.PubKey) + require.NoError(t, err) + require.Equal(t, peer.PubKey, dbPeer.PubKey) + + // *** Channels *** Add a channel for an unknown peer and assert an + // error is returned. + channelID, err := store.AddChannel( + ctx, testChanPoint1, testShortChanID1, 9999, + ) + require.Error(t, err) + require.Zero(t, channelID) + + // Add a channel for the peer. + channelID, err = store.AddChannel( + ctx, testChanPoint1, testShortChanID1, peerID, + ) + require.NoError(t, err) + require.NotZero(t, channelID) + + // Get a non-existent channel and assert an error is returned. + dbChannel, err := store.GetChannel(ctx, "non-existent-chan-point") + require.ErrorIs(t, err, errUnknownChannel) + require.Nil(t, dbChannel) + + // Get the channel and assert it is the same. + dbChannel, err = store.GetChannel(ctx, testChanPoint1) + require.NoError(t, err) + require.Equal(t, testChanPoint1, dbChannel.ChannelPoint) + require.Equal(t, testShortChanID1, dbChannel.ShortChannelID) + require.Equal(t, peerID, dbChannel.PeerID) + + // Add a second channel for the same peer. + channel2ID, err := store.AddChannel( + ctx, testChanPoint2, testShortChanID2, peerID, + ) + require.NoError(t, err) + require.NotZero(t, channel2ID) + + // Add an online event for the channel. + onlineEvent := &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeOnline, + } + err = store.AddChannelEvent(ctx, onlineEvent) + require.NoError(t, err) + + // Advance the clock for the next event. + clock.SetTime(testTime.Add(time.Second)) + + // Add an update event for the channel. + localBalance := btcutil.Amount(1000) + remoteBalance := btcutil.Amount(2000) + updateEvent := &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeUpdate, + LocalBalance: fn.Some(localBalance), + RemoteBalance: fn.Some(remoteBalance), + } + err = store.AddChannelEvent(ctx, updateEvent) + require.NoError(t, err) + + // Get the channel events and assert they are correct. + events, err := store.GetChannelEvents( + ctx, channelID, time.Unix(0, 0), time.Unix(3, 0), + ) + require.NoError(t, err) + require.Len(t, events, 2) + + require.Equal(t, onlineEvent.EventType, events[0].EventType) + require.Equal(t, testTime.Unix(), events[0].Timestamp.Unix()) + require.True(t, events[0].LocalBalance.IsNone()) + require.True(t, events[0].RemoteBalance.IsNone()) + + require.Equal(t, updateEvent.EventType, events[1].EventType) + require.Equal( + t, testTime.Add(time.Second).Unix(), events[1].Timestamp.Unix(), + ) + require.Equal(t, updateEvent.LocalBalance, events[1].LocalBalance) + require.Equal(t, updateEvent.RemoteBalance, events[1].RemoteBalance) +} diff --git a/chanevents/test_postgres.go b/chanevents/test_postgres.go new file mode 100644 index 0000000..daf34e7 --- /dev/null +++ b/chanevents/test_postgres.go @@ -0,0 +1,29 @@ +//go:build test_db_postgres + +package chanevents + +import ( + "testing" + + "github.com/lightninglabs/faraday/db" + "github.com/lightningnetwork/lnd/clock" + "github.com/stretchr/testify/require" +) + +// NewTestDB creates a new test chanevents.Store backed by a postgres DB. +func NewTestDB(t *testing.T, clock clock.Clock) *Store { + // We'll create a new test database. The call to NewTestPostgresDB will + // automatically create the DB and apply the migrations. + testDB := db.NewTestPostgresDB(t) + + // Now, we'll create the FaradayDB instance from the test database. The + // FaradayDB is the main database object that holds the connection and + // the generated querier. + faradayDB := createStore(t, testDB.BaseDB, clock) + + t.Cleanup(func() { + require.NoError(t, faradayDB.Close()) + }) + + return faradayDB +} diff --git a/chanevents/test_sql.go b/chanevents/test_sql.go new file mode 100644 index 0000000..1785159 --- /dev/null +++ b/chanevents/test_sql.go @@ -0,0 +1,18 @@ +package chanevents + +import ( + "testing" + + "github.com/lightninglabs/faraday/db/sqlc" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/sqldb/v2" +) + +// createStore is a helper function that creates a new Store. +func createStore(t *testing.T, sqlDB *sqldb.BaseDB, clock clock.Clock) *Store { + queries := sqlc.NewForType(sqlDB, sqlDB.BackendType) + + store := NewStore(sqlDB, queries, clock) + + return store +} diff --git a/chanevents/test_sqlite.go b/chanevents/test_sqlite.go new file mode 100644 index 0000000..a6c3522 --- /dev/null +++ b/chanevents/test_sqlite.go @@ -0,0 +1,30 @@ +//go:build !test_db_postgres + +package chanevents + +import ( + "testing" + + "github.com/lightninglabs/faraday/db" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/sqldb/v2" + "github.com/stretchr/testify/require" +) + +// NewTestDB creates a new test chanevents.Store backed by a sqlite DB. +func NewTestDB(t *testing.T, clock clock.Clock) *Store { + // We'll create a new test database. The call to NewTestSqliteDB will + // automatically create the DB and apply the migrations. + testDB := sqldb.NewTestSqliteDB(t, db.FaradayMigrationSets) + + // Now, we'll create the FaradayDB instance from the test database. The + // FaradayDB is the main database object that holds the connection and + // the generated querier. + faradayDB := createStore(t, testDB.BaseDB, clock) + + t.Cleanup(func() { + require.NoError(t, faradayDB.Close()) + }) + + return faradayDB +} diff --git a/go.mod b/go.mod index 5e949dc..37054d3 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( github.com/lightninglabs/lndclient v1.0.1-0.20260224134629-de7b65bb4c60 github.com/lightningnetwork/lnd v0.20.0-beta.rc4.0.20260223110936-dd65ba2b0106 github.com/lightningnetwork/lnd/cert v1.2.2 + github.com/lightningnetwork/lnd/clock v1.1.1 + github.com/lightningnetwork/lnd/fn/v2 v2.0.9 github.com/lightningnetwork/lnd/kvdb v1.4.16 github.com/lightningnetwork/lnd/sqldb/v2 v2.0.0-20260326184657-f7cc56305bae github.com/shopspring/decimal v1.2.0 @@ -100,8 +102,6 @@ require ( github.com/lightninglabs/neutrino v0.16.1 // indirect github.com/lightninglabs/neutrino/cache v1.1.2 // indirect github.com/lightningnetwork/lightning-onion v1.2.1-0.20240815225420-8b40adf04ab9 // indirect - github.com/lightningnetwork/lnd/clock v1.1.1 // indirect - github.com/lightningnetwork/lnd/fn/v2 v2.0.9 // indirect github.com/lightningnetwork/lnd/healthcheck v1.2.6 // indirect github.com/lightningnetwork/lnd/queue v1.1.1 // indirect github.com/lightningnetwork/lnd/sqldb v1.0.13-0.20260223110936-dd65ba2b0106 // indirect