From e0467adbfc13fd9f709b293461aeb165179d6e9e Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 2 Apr 2026 10:49:07 +0200 Subject: [PATCH 1/8] chanevents: add event types This was forgotten previously. --- chanevents/chanevents.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chanevents/chanevents.go b/chanevents/chanevents.go index 70ca21f..41289bf 100644 --- a/chanevents/chanevents.go +++ b/chanevents/chanevents.go @@ -14,16 +14,16 @@ type EventType int16 const ( // EventTypeUnknown is the unknown event type. - EventTypeUnknown = 0 + EventTypeUnknown EventType = 0 // EventTypeOnline is the online event type. - EventTypeOnline = 1 + EventTypeOnline EventType = 1 // EventTypeOffline is the offline event type. - EventTypeOffline = 2 + EventTypeOffline EventType = 2 // EventTypeUpdate is the balance update event type. - EventTypeUpdate = 3 + EventTypeUpdate EventType = 3 ) // String returns the string representation of the event type. From 34d84ff9370892d4e4d0668bb085b645d75f94ce Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 2 Apr 2026 12:21:38 +0200 Subject: [PATCH 2/8] chanevents: add event comparison test helper Introduce requireEqualEvent to reduce boilerplate in store tests, comparing all user-set fields while ignoring the auto-assigned ID. --- chanevents/store_test.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/chanevents/store_test.go b/chanevents/store_test.go index c9bbaf2..6101053 100644 --- a/chanevents/store_test.go +++ b/chanevents/store_test.go @@ -22,6 +22,22 @@ var ( testTime = time.Unix(1, 0) ) +// requireEqualEvent asserts that a retrieved event matches the expected values, +// comparing only the fields that are set before insertion (ignoring the +// auto-assigned ID). +func requireEqualEvent(t *testing.T, expected *ChannelEvent, + expectedTime time.Time, actual *ChannelEvent) { + + t.Helper() + + require.Equal(t, expected.ChannelID, actual.ChannelID) + require.Equal(t, expected.EventType, actual.EventType) + require.Equal(t, expectedTime.Unix(), actual.Timestamp.Unix()) + require.Equal(t, expected.LocalBalance, actual.LocalBalance) + require.Equal(t, expected.RemoteBalance, actual.RemoteBalance) + require.Equal(t, expected.IsSync, actual.IsSync) +} + // TestStore tests the chanevents store. func TestStore(t *testing.T) { t.Parallel() @@ -114,15 +130,8 @@ func TestStore(t *testing.T) { require.NoError(t, err) require.Len(t, events, 2) - require.Equal(t, onlineEvent.EventType, events[0].EventType) - require.Equal(t, testTime.Unix(), events[0].Timestamp.Unix()) - require.True(t, events[0].LocalBalance.IsNone()) - require.True(t, events[0].RemoteBalance.IsNone()) - - require.Equal(t, updateEvent.EventType, events[1].EventType) - require.Equal( - t, testTime.Add(time.Second).Unix(), events[1].Timestamp.Unix(), + requireEqualEvent(t, onlineEvent, testTime, events[0]) + requireEqualEvent( + t, updateEvent, testTime.Add(time.Second), events[1], ) - require.Equal(t, updateEvent.LocalBalance, events[1].LocalBalance) - require.Equal(t, updateEvent.RemoteBalance, events[1].RemoteBalance) } From 53148c5232edb583fdd71103f8417511196b365f Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 2 Apr 2026 12:23:47 +0200 Subject: [PATCH 3/8] chanevents+db: add sync row We want to know if an update came from an initial sync. This also helps us to identify data gaps and one can be sure it was not due to an actual event. Modify the migration as it's unreleased. --- db/sqlc/chanevents.sql.go | 10 +++++++--- db/sqlc/migrations/000001_chanevents.up.sql | 5 ++++- db/sqlc/models.go | 1 + db/sqlc/queries/chanevents.sql | 5 +++-- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/db/sqlc/chanevents.sql.go b/db/sqlc/chanevents.sql.go index 9720995..5c7d5a5 100644 --- a/db/sqlc/chanevents.sql.go +++ b/db/sqlc/chanevents.sql.go @@ -44,7 +44,7 @@ func (q *Queries) GetChannelByShortChanID(ctx context.Context, shortChannelID in } const getChannelEvents = `-- name: GetChannelEvents :many -SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat FROM channel_events +SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, is_sync FROM channel_events WHERE channel_id = $1 AND timestamp >= $2 AND timestamp < $3 ORDER BY timestamp ASC, id ASC ` @@ -71,6 +71,7 @@ func (q *Queries) GetChannelEvents(ctx context.Context, arg GetChannelEventsPara &i.Timestamp, &i.LocalBalanceSat, &i.RemoteBalanceSat, + &i.IsSync, ); err != nil { return nil, err } @@ -115,8 +116,9 @@ func (q *Queries) InsertChannel(ctx context.Context, arg InsertChannelParams) (i const insertChannelEvent = `-- name: InsertChannelEvent :exec INSERT INTO channel_events ( - channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat -) VALUES ($1, $2, $3, $4, $5) + channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, + is_sync +) VALUES ($1, $2, $3, $4, $5, $6) ` type InsertChannelEventParams struct { @@ -125,6 +127,7 @@ type InsertChannelEventParams struct { Timestamp time.Time LocalBalanceSat sql.NullInt64 RemoteBalanceSat sql.NullInt64 + IsSync bool } func (q *Queries) InsertChannelEvent(ctx context.Context, arg InsertChannelEventParams) error { @@ -134,6 +137,7 @@ func (q *Queries) InsertChannelEvent(ctx context.Context, arg InsertChannelEvent arg.Timestamp, arg.LocalBalanceSat, arg.RemoteBalanceSat, + arg.IsSync, ) return err } diff --git a/db/sqlc/migrations/000001_chanevents.up.sql b/db/sqlc/migrations/000001_chanevents.up.sql index f733b59..c4cf35e 100644 --- a/db/sqlc/migrations/000001_chanevents.up.sql +++ b/db/sqlc/migrations/000001_chanevents.up.sql @@ -35,7 +35,10 @@ CREATE TABLE IF NOT EXISTS channel_events ( local_balance_sat BIGINT CHECK (local_balance_sat >= 0), -- The remote balance of the channel at the time of the event. -- This is only populated for balance update events. - remote_balance_sat BIGINT CHECK (remote_balance_sat >= 0) + remote_balance_sat BIGINT CHECK (remote_balance_sat >= 0), + -- Whether this event was recorded during an initial sync rather than + -- from a live subscription. + is_sync BOOLEAN NOT NULL DEFAULT FALSE ); -- This composite index is crucial for efficiently querying the event history diff --git a/db/sqlc/models.go b/db/sqlc/models.go index 1094533..fa20db0 100644 --- a/db/sqlc/models.go +++ b/db/sqlc/models.go @@ -23,6 +23,7 @@ type ChannelEvent struct { Timestamp time.Time LocalBalanceSat sql.NullInt64 RemoteBalanceSat sql.NullInt64 + IsSync bool } type Peer struct { diff --git a/db/sqlc/queries/chanevents.sql b/db/sqlc/queries/chanevents.sql index 5370e4e..c615421 100644 --- a/db/sqlc/queries/chanevents.sql +++ b/db/sqlc/queries/chanevents.sql @@ -15,8 +15,9 @@ SELECT * FROM channels WHERE short_channel_id = $1; -- name: InsertChannelEvent :exec INSERT INTO channel_events ( - channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat -) VALUES ($1, $2, $3, $4, $5); + channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, + is_sync +) VALUES ($1, $2, $3, $4, $5, $6); -- name: GetChannelEvents :many SELECT * FROM channel_events From 14b4e01731226f100626768465df1eb496d9bd3b Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 2 Apr 2026 12:21:52 +0200 Subject: [PATCH 4/8] chanevents: add IsSync --- chanevents/chanevents.go | 4 ++++ chanevents/store.go | 2 ++ chanevents/store_test.go | 22 ++++++++++++++++++++++ 3 files changed, 28 insertions(+) diff --git a/chanevents/chanevents.go b/chanevents/chanevents.go index 41289bf..6359195 100644 --- a/chanevents/chanevents.go +++ b/chanevents/chanevents.go @@ -106,4 +106,8 @@ type ChannelEvent struct { // RemoteBalance is the remote balance of the channel at the time of the // event. This is only populated for balance update events. RemoteBalance fn.Option[btcutil.Amount] + + // IsSync indicates whether this event was recorded during an initial + // sync rather than from a live subscription. + IsSync bool } diff --git a/chanevents/store.go b/chanevents/store.go index a3d202d..36f96d8 100644 --- a/chanevents/store.go +++ b/chanevents/store.go @@ -213,6 +213,7 @@ func (s *Store) AddChannelEvent(ctx context.Context, Timestamp: timestamp, LocalBalanceSat: localBalance, RemoteBalanceSat: remoteBalance, + IsSync: event.IsSync, }, ) if err != nil { @@ -269,5 +270,6 @@ func marshalChannelEvent(dbEvent sqlc.ChannelEvent) *ChannelEvent { Timestamp: dbEvent.Timestamp.UTC(), LocalBalance: localBalance, RemoteBalance: remoteBalance, + IsSync: dbEvent.IsSync, } } diff --git a/chanevents/store_test.go b/chanevents/store_test.go index 6101053..3446c5f 100644 --- a/chanevents/store_test.go +++ b/chanevents/store_test.go @@ -134,4 +134,26 @@ func TestStore(t *testing.T) { requireEqualEvent( t, updateEvent, testTime.Add(time.Second), events[1], ) + + // Advance the clock and add a sync event to verify the IsSync flag + // round-trips correctly. + clock.SetTime(testTime.Add(2 * time.Second)) + + syncEvent := &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeOnline, + IsSync: true, + } + err = store.AddChannelEvent(ctx, syncEvent) + require.NoError(t, err) + + events, err = store.GetChannelEvents( + ctx, channelID, time.Unix(0, 0), time.Unix(4, 0), + ) + require.NoError(t, err) + require.Len(t, events, 3) + + requireEqualEvent( + t, syncEvent, testTime.Add(2*time.Second), events[2], + ) } From 188163c8dd82cb7be536ce69a19f565652abd300 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Wed, 24 Sep 2025 13:20:46 +0200 Subject: [PATCH 5/8] faraday: initialize stores --- config.go | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++ faraday.go | 16 +++++++ 2 files changed, 142 insertions(+) diff --git a/config.go b/config.go index 0f35256..70713ee 100644 --- a/config.go +++ b/config.go @@ -11,11 +11,16 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/lightninglabs/faraday/chain" + "github.com/lightninglabs/faraday/chanevents" + "github.com/lightninglabs/faraday/db" + "github.com/lightninglabs/faraday/db/sqlc" "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/cert" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/sqldb/v2" "google.golang.org/grpc/credentials" ) @@ -35,6 +40,16 @@ const ( // certificate. The value corresponds to 14 months // (14 months * 30 days * 24 hours). defaultTLSCertDuration = 14 * 30 * 24 * time.Hour + + // DatabaseBackendSqlite is the name of the SQLite database backend. + DatabaseBackendSqlite = "sqlite" + + // DatabaseBackendPostgres is the name of the Postgres database backend. + DatabaseBackendPostgres = "postgres" + + // defaultSqliteDatabaseFileName is the default name of the SQLite + // database file. + defaultSqliteDatabaseFileName = "faraday.db" ) var ( @@ -158,6 +173,17 @@ type Config struct { //nolint:maligned // Logging controls various aspects of pool logging. Logging *build.LogConfig `group:"logging" namespace:"logging"` + + // DatabaseBackend is the database backend we will use for storing all + // liveness data. + DatabaseBackend string `long:"databasebackend" description:"The database backend to use for storing all liveness data." choice:"sqlite" choice:"postgres"` + + // Sqlite holds the configuration options for a SQLite database + // backend. + Sqlite *db.SqliteConfig `group:"sqlite" namespace:"sqlite"` + + // Postgres holds the configuration options for a Postgres database + Postgres *sqldb.PostgresConfig `group:"postgres" namespace:"postgres"` } // DefaultConfig returns all default values for the Config struct. @@ -179,6 +205,10 @@ func DefaultConfig() Config { ChainConn: defaultChainConn, Bitcoin: chain.DefaultConfig, Logging: build.DefaultLogConfig(), + DatabaseBackend: DatabaseBackendSqlite, + Sqlite: &db.SqliteConfig{ + DatabaseFileName: defaultSqliteDatabaseFileName, + }, } } @@ -396,3 +426,99 @@ func loadCertWithCreate(cfg *Config) (tls.Certificate, *x509.Certificate, return cert.LoadCert(cfg.TLSCertPath, cfg.TLSKeyPath) } + +// stores holds a collection of the DB stores that are used by faraday. +type stores struct { + // ChanEventsStore is used to watch for channel events. + ChanEventsStore *chanevents.Store + + // closeFns holds various callbacks that can be used to close any open + // stores in the stores struct. + closeFns map[string]func() error +} + +// NewStores creates a new stores instance based on the chosen database backend. +func NewStores(cfg Config, clock clock.Clock) (*stores, error) { + var ( + stores = &stores{ + closeFns: make(map[string]func() error), + } + ) + + switch cfg.DatabaseBackend { + case DatabaseBackendSqlite: + dbPath := filepath.Join( + cfg.FaradayDir, cfg.Sqlite.DatabaseFileName, + ) + + sqlStore, err := sqldb.NewSqliteStore(&sqldb.SqliteConfig{ + SkipMigrations: cfg.Sqlite.SkipMigrations, + SkipMigrationDbBackup: cfg.Sqlite.SkipMigrationDbBackup, + }, dbPath) + if err != nil { + return stores, err + } + + if !cfg.Sqlite.SkipMigrations { + err = sqldb.ApplyAllMigrations( + sqlStore, db.FaradayMigrationSets, + ) + if err != nil { + return stores, fmt.Errorf("error applying "+ + "migrations to SQLite store: %w", err, + ) + } + } + + queries := sqlc.NewForType(sqlStore, sqlStore.BackendType) + + stores.ChanEventsStore = chanevents.NewStore( + sqlStore.BaseDB, queries, clock, + ) + + stores.closeFns["sqlite"] = sqlStore.Close + + case DatabaseBackendPostgres: + sqlStore, err := sqldb.NewPostgresStore(cfg.Postgres) + if err != nil { + return stores, err + } + + if !cfg.Postgres.SkipMigrations { + err = sqldb.ApplyAllMigrations( + sqlStore, db.FaradayMigrationSets, + ) + if err != nil { + return stores, fmt.Errorf("error applying "+ + "migrations to Postgres store: %w", err, + ) + } + } + + queries := sqlc.NewForType(sqlStore, sqlStore.BackendType) + + stores.ChanEventsStore = chanevents.NewStore( + sqlStore.BaseDB, queries, clock, + ) + + stores.closeFns["postgres"] = sqlStore.Close + + default: + return nil, fmt.Errorf("unsupported database backend: "+ + "%s", cfg.DatabaseBackend) + } + + return stores, nil +} + +// Close closes all the stores. +func (s *stores) Close() error { + for name, closeFn := range s.closeFns { + err := closeFn() + if err != nil { + return fmt.Errorf("error closing %s store: %v", name, err) + } + } + + return nil +} diff --git a/faraday.go b/faraday.go index e919bd5..b302475 100644 --- a/faraday.go +++ b/faraday.go @@ -23,6 +23,7 @@ import ( "github.com/lightninglabs/faraday/frdrpcserver/perms" "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc/verrpc" @@ -89,6 +90,9 @@ type Faraday struct { // reuse of the struct, since internal fields are not reset. stopped atomic.Bool + // stores contains all the stores used by faraday. + stores *stores + lnd *lndclient.GrpcLndServices // lndOwned indicates whether Faraday created the lnd connection @@ -442,6 +446,12 @@ func (f *Faraday) Stop() error { // can complete cleanly. f.wg.Wait() + if f.stores != nil { + if err := f.stores.Close(); err != nil { + log.Errorf("Error closing stores: %v", err) + } + } + var stopErr error if f.macaroonService != nil { err := f.macaroonService.Stop() @@ -536,6 +546,12 @@ func (f *Faraday) initialize(withMacaroonService bool) error { } } + // Create any relevant stores. + f.stores, err = NewStores(*f.cfg, clock.NewDefaultClock()) + if err != nil { + return fmt.Errorf("could not create stores: %v", err) + } + return nil } From 1bd6c5f669509f204cd168d476e201324dadc433 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 2 Apr 2026 10:46:01 +0200 Subject: [PATCH 6/8] chanevents: add channel event monitor --- chanevents/log.go | 25 +++ chanevents/monitor.go | 508 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 533 insertions(+) create mode 100644 chanevents/log.go create mode 100644 chanevents/monitor.go diff --git a/chanevents/log.go b/chanevents/log.go new file mode 100644 index 0000000..492d337 --- /dev/null +++ b/chanevents/log.go @@ -0,0 +1,25 @@ +package chanevents + +import ( + "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/build" +) + +const Subsystem = "CHEV" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/chanevents/monitor.go b/chanevents/monitor.go new file mode 100644 index 0000000..6ff52f0 --- /dev/null +++ b/chanevents/monitor.go @@ -0,0 +1,508 @@ +package chanevents + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/lndclient" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/routing/route" +) + +const ( + // retryInterval is the time to wait before retrying after a + // transient error or while waiting for lnd to become ready. + retryInterval = 5 * time.Second +) + +var ( + // errMonitorAlreadyStarted is returned when the monitor is already + // started. + errMonitorAlreadyStarted = errors.New("monitor already started") + + // errMonitorNotStarted is returned when the monitor is not started. + errMonitorNotStarted = errors.New("monitor not started") +) + +// Monitor is an active component that listens to LND channel events and records +// them in the database. +type Monitor struct { + started atomic.Bool + + // lnd is the lnd client that the monitor will use to subscribe to + // channel events. + lnd lndclient.LightningClient + + // store is the channel events store that the monitor will use to record + // channel events. + store *Store + + wg sync.WaitGroup + quit chan struct{} +} + +// NewMonitor creates a new channel events monitor. +func NewMonitor(lnd lndclient.LightningClient, store *Store) *Monitor { + return &Monitor{ + lnd: lnd, + store: store, + quit: make(chan struct{}), + } +} + +// Start starts the channel events monitor. +func (m *Monitor) Start(ctx context.Context) error { + if !m.started.CompareAndSwap(false, true) { + return errMonitorAlreadyStarted + } + + log.Info("Starting channel events monitor") + + m.quit = make(chan struct{}) + + m.wg.Add(1) + go m.monitorLoop(ctx) + + return nil +} + +// Stop stops the channel events monitor. +func (m *Monitor) Stop() error { + if !m.started.CompareAndSwap(true, false) { + return errMonitorNotStarted + } + + log.Info("Stopping channel events monitor") + + close(m.quit) + m.wg.Wait() + + return nil +} + +// monitorLoop is the main loop of the channel events monitor. It waits for lnd +// to be fully synced, performs an initial state sync, and then subscribes to +// channel events. If the subscription fails or the stream breaks, it retries +// from the beginning. +func (m *Monitor) monitorLoop(ctx context.Context) { + defer m.wg.Done() + + log.Info("Channel events monitor starting") + + for { + // Wait for lnd to be synced to chain, retrying on RPC errors. + if !m.waitForReady(ctx) { + return + } + + // Initial state sync. + if err := m.initialSync(ctx); err != nil { + log.Errorf("Error during initial sync: %v", err) + } + + // Subscribe and consume events until the stream breaks or an + // error occurs. + if !m.subscribe(ctx) { + return + } + + // Stream broke, wait before reconnecting. + log.Infof("Reconnecting channel event subscription...") + + select { + case <-time.After(retryInterval): + case <-m.quit: + return + case <-ctx.Done(): + return + } + } +} + +// waitForReady polls lnd's GetInfo until it reports SyncedToChain. It retries +// on transient RPC errors. It returns true when lnd is ready, or false if the +// monitor is shutting down. +func (m *Monitor) waitForReady(ctx context.Context) bool { + for { + info, err := m.lnd.GetInfo(ctx) + if err != nil { + log.Warnf("Error getting lnd info, retrying: %v", err) + } else if info.SyncedToChain { + return true + } else { + log.Infof("Waiting for lnd to sync to chain...") + } + + select { + case <-time.After(retryInterval): + case <-m.quit: + return false + case <-ctx.Done(): + return false + } + } +} + +// subscribe subscribes to lnd channel events and processes them until the +// stream breaks or an error occurs. It returns true on transient failures +// (caller should retry) or false if the monitor is shutting down. +func (m *Monitor) subscribe(ctx context.Context) bool { + eventChan, errChan, err := m.lnd.SubscribeChannelEvents(ctx) + if err != nil { + log.Errorf("Error subscribing to channel events: %v", err) + + // Return true to signal the caller to retry. + return true + } + + for { + select { + case event, ok := <-eventChan: + if !ok { + log.Warn("Channel event stream closed") + return true + } + if err := m.handleChannelEvent(ctx, event); err != nil { + log.Errorf("Error handling channel event: %v", + err) + } + + case err, ok := <-errChan: + if !ok { + log.Warn("Channel event error stream " + + "closed") + + return true + } + log.Errorf("Error from channel event "+ + "subscription: %v", err) + + return true + + case <-m.quit: + log.Info("Channel events monitor stopping") + return false + + case <-ctx.Done(): + log.Info("Channel events monitor stopping") + return false + } + } +} + +// initialSync performs an initial sync of the channel state. It queries lnd for +// all known channels (open and closed) and records their current state in the +// database. This ensures that any channel events that occurred while faraday +// was offline are accounted for: even though individual events are lost, the +// latest state is captured as a baseline. Events recorded during initial sync +// are marked with IsSync=true to distinguish them from real-time events +// received via the subscription. +func (m *Monitor) initialSync(ctx context.Context) error { + log.Info("Performing initial sync of channel state") + + closedChannels, err := m.lnd.ClosedChannels(ctx) + if err != nil { + return fmt.Errorf("error listing closed channels: %w", err) + } + + for _, channel := range closedChannels { + // Abort if the context has been cancelled. + if ctx.Err() != nil { + return ctx.Err() + } + + // Channels that didn't confirm onchain will be present here, + // but don't have a channel ID. We skip those. + if channel.ChannelID == 0 { + log.Debugf("Skipping closed channel with no "+ + "channel ID: %s", channel.ChannelPoint) + + continue + } + + err := m.addChannel( + ctx, channel.PubKeyBytes, channel.ChannelPoint, + channel.ChannelID, + ) + + if err != nil { + log.Errorf("error adding closed channel %s: %v", + channel.ChannelPoint, err) + + continue + } + + dbChan, err := m.store.GetChannel(ctx, channel.ChannelPoint) + if err != nil { + log.Errorf("error getting closed channel %s from db: %v", + channel.ChannelPoint, err) + + continue + } + + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: EventTypeOffline, + IsSync: true, + }); err != nil { + log.Errorf("error adding offline event for closed "+ + "channel %s: %v", channel.ChannelPoint, err) + } + } + + channels, err := m.lnd.ListChannels(ctx, false, false) + if err != nil { + return fmt.Errorf("error listing channels: %w", err) + } + + for _, channel := range channels { + // Abort if the context has been cancelled. + if ctx.Err() != nil { + return ctx.Err() + } + + // We make sure the channel exists in the store. + err := m.addChannel( + ctx, channel.PubKeyBytes, channel.ChannelPoint, + channel.ChannelID, + ) + if err != nil { + log.Errorf("error adding channel %s: %v", + channel.ChannelPoint, err) + + continue + } + + dbChan, err := m.store.GetChannel(ctx, channel.ChannelPoint) + if err != nil { + log.Errorf("error getting channel %s from db: %v", + channel.ChannelPoint, err) + + continue + } + + eventType := EventTypeOffline + if channel.Active { + eventType = EventTypeOnline + } + + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: eventType, + IsSync: true, + }); err != nil { + log.Errorf("error adding event for channel %s: %v", + channel.ChannelPoint, err) + } + + // We add the update event separately from the online/offline + // event above, because each event type serves a different + // purpose: the online/offline event tracks channel + // availability, while the update event captures a balance + // snapshot. Keeping them as distinct records allows querying + // availability and balance history independently. + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: EventTypeUpdate, + LocalBalance: fn.Some(channel.LocalBalance), + RemoteBalance: fn.Some(channel.RemoteBalance), + IsSync: true, + }); err != nil { + log.Errorf("error adding event for channel %s: %v", + channel.ChannelPoint, err) + } + } + + return nil +} + +// addChannel adds a channel and its peer to the store. +func (m *Monitor) addChannel(ctx context.Context, pubKeyBytes route.Vertex, + channelPoint string, channelID uint64) error { + + // Check if the channel already exists. + channel, err := m.store.GetChannel(ctx, channelPoint) + if err != nil && !errors.Is(err, errUnknownChannel) { + return fmt.Errorf("error getting channel %s: %w", + channelPoint, err) + } + if channel != nil { + // Channel already exists, nothing to do. + return nil + } + + // Check if peer already exists. + peer, err := m.store.GetPeer(ctx, pubKeyBytes.String()) + if err != nil && !errors.Is(err, errUnknownPeer) { + return fmt.Errorf("error getting peer %s: %w", + pubKeyBytes, err) + } + + var peerID int64 + if peer != nil { + peerID = peer.ID + } else { + peerID, err = m.store.AddPeer( + ctx, pubKeyBytes.String(), + ) + if err != nil { + return fmt.Errorf("error adding peer %s: %w", + pubKeyBytes, err) + } + } + + _, err = m.store.AddChannel(ctx, channelPoint, channelID, peerID) + if err != nil { + return fmt.Errorf("error adding channel %s: %w", + channelPoint, err) + } + + log.Infof("Added channel %s to db", channelPoint) + + return nil +} + +// handleChannelEvent handles a single channel event. +func (m *Monitor) handleChannelEvent(ctx context.Context, + event *lndclient.ChannelEventUpdate) error { + + switch event.UpdateType { + case lndclient.OpenChannelUpdate: + openChannel := event.OpenedChannelInfo + if openChannel == nil { + return fmt.Errorf("open_channel event is nil") + } + + log.Debugf("Handling open channel event: %+v", openChannel) + + // We add the new channel to the store. + if err := m.addChannel( + ctx, openChannel.PubKeyBytes, openChannel.ChannelPoint, + openChannel.ChannelID, + ); err != nil { + return err + } + + // Now add the online and update events. + dbChan, err := m.store.GetChannel(ctx, openChannel.ChannelPoint) + if err != nil { + return err + } + + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: EventTypeOnline, + }); err != nil { + return err + } + + return m.addUpdateEvent(ctx, openChannel) + + case lndclient.ClosedChannelUpdate: + if event.ClosedChannelInfo == nil { + return fmt.Errorf("closed_channel event is nil") + } + + log.Debugf("Handling offline channel event: %+v", + event.ClosedChannelInfo) + + return m.addOfflineEvent(ctx, + event.ClosedChannelInfo.ChannelPoint) + + case lndclient.ActiveChannelUpdate: + log.Debugf("Handling active channel event: %v", + event.ChannelPoint) + + return m.addOnlineEvent(ctx, event.ChannelPoint.String()) + + case lndclient.InactiveChannelUpdate: + log.Debugf("Handling offline channel event: %v", + event.ChannelPoint) + + return m.addOfflineEvent(ctx, event.ChannelPoint.String()) + + case lndclient.PendingOpenChannelUpdate: + log.Debugf("Ignoring pending channel event: %v", + event.ChannelPoint) + + return nil + + case lndclient.StateChannelUpdate: + if event.UpdatedChannelInfo == nil { + return fmt.Errorf("state_update event is nil") + } + + log.Debugf("Handling channel update event: %+v", + event.UpdatedChannelInfo) + + return m.addUpdateEvent(ctx, event.UpdatedChannelInfo) + } + + return nil +} + +// addOnlineEvent adds an online event for a channel. +func (m *Monitor) addOnlineEvent(ctx context.Context, + channelPoint string) error { + + channel, err := m.store.GetChannel(ctx, channelPoint) + if err != nil { + return fmt.Errorf("error getting channel %s: %w", channelPoint, + err) + } + + log.Infof("Adding online event for channel %s", channelPoint) + + return m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: channel.ID, + EventType: EventTypeOnline, + }) +} + +// addOfflineEvent adds an offline event for a channel. +func (m *Monitor) addOfflineEvent(ctx context.Context, + channelPoint string) error { + + channel, err := m.store.GetChannel(ctx, channelPoint) + if err != nil { + return fmt.Errorf("error getting channel %s: %w", channelPoint, + err) + } + + log.Infof("Adding offline event for channel %s", channelPoint) + + return m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: channel.ID, + EventType: EventTypeOffline, + }) +} + +// addUpdateEvent adds an update event for a channel. +func (m *Monitor) addUpdateEvent(ctx context.Context, + channelInfo *lndclient.ChannelInfo) error { + + channel, err := m.store.GetChannel(ctx, channelInfo.ChannelPoint) + if err != nil { + return fmt.Errorf("error getting channel %s: %w", + channelInfo.ChannelPoint, err) + } + + log.Tracef("Adding update event for channel %s", + channelInfo.ChannelPoint) + + return m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: channel.ID, + EventType: EventTypeUpdate, + LocalBalance: fn.Some( + btcutil.Amount(channelInfo.LocalBalance), + ), + RemoteBalance: fn.Some( + btcutil.Amount(channelInfo.RemoteBalance), + ), + }) +} From 14d664e93da0eb6eb0a6dae7139ed7ed66d343ba Mon Sep 17 00:00:00 2001 From: bitromortac Date: Thu, 2 Apr 2026 10:46:06 +0200 Subject: [PATCH 7/8] faraday: register channel events logger Also refactor the root logger. --- log.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/log.go b/log.go index 9edd23d..1821875 100644 --- a/log.go +++ b/log.go @@ -3,6 +3,7 @@ package faraday import ( "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/faraday/accounting" + "github.com/lightninglabs/faraday/chanevents" "github.com/lightninglabs/faraday/dataset" "github.com/lightninglabs/faraday/fiat" "github.com/lightninglabs/faraday/frdrpcserver" @@ -37,6 +38,7 @@ func SetupLoggers(root *build.SubLoggerManager, intercept signal.Interceptor) { addSubLogger(root, revenue.Subsystem, intercept, revenue.UseLogger) addSubLogger(root, fiat.Subsystem, intercept, fiat.UseLogger) addSubLogger(root, accounting.Subsystem, intercept, accounting.UseLogger) + addSubLogger(root, chanevents.Subsystem, intercept, chanevents.UseLogger) } // UseLogger uses a specified Logger to output package logging info. From f3a3de9ac4687a1768ffc1604e9409a75977c8a6 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Fri, 26 Sep 2025 09:39:05 +0200 Subject: [PATCH 8/8] faraday: start chan events monitor --- chanevents/monitor.go | 12 +++++++++--- faraday.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/chanevents/monitor.go b/chanevents/monitor.go index 6ff52f0..c83a65c 100644 --- a/chanevents/monitor.go +++ b/chanevents/monitor.go @@ -94,15 +94,21 @@ func (m *Monitor) monitorLoop(ctx context.Context) { log.Info("Channel events monitor starting") + var synced bool + for { // Wait for lnd to be synced to chain, retrying on RPC errors. if !m.waitForReady(ctx) { return } - // Initial state sync. - if err := m.initialSync(ctx); err != nil { - log.Errorf("Error during initial sync: %v", err) + // Initial state sync, only performed once. + if !synced { + if err := m.initialSync(ctx); err != nil { + log.Errorf("Error during initial sync: %v", err) + } else { + synced = true + } } // Subscribe and consume events until the stream breaks or an diff --git a/faraday.go b/faraday.go index b302475..0f15d38 100644 --- a/faraday.go +++ b/faraday.go @@ -18,6 +18,7 @@ import ( proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/jessevdk/go-flags" "github.com/lightninglabs/faraday/chain" + "github.com/lightninglabs/faraday/chanevents" "github.com/lightninglabs/faraday/frdrpc" "github.com/lightninglabs/faraday/frdrpcserver" "github.com/lightninglabs/faraday/frdrpcserver/perms" @@ -90,9 +91,15 @@ type Faraday struct { // reuse of the struct, since internal fields are not reset. stopped atomic.Bool + // monitor is the channel events monitor. + monitor *chanevents.Monitor + // stores contains all the stores used by faraday. stores *stores + // ctxCancel is a function that can be used to cancel the main context. + ctxCancel context.CancelFunc + lnd *lndclient.GrpcLndServices // lndOwned indicates whether Faraday created the lnd connection @@ -446,6 +453,17 @@ func (f *Faraday) Stop() error { // can complete cleanly. f.wg.Wait() + if f.ctxCancel != nil { + f.ctxCancel() + } + + if f.monitor != nil { + if err := f.monitor.Stop(); err != nil { + log.Errorf("Error stopping channel event monitor: %v", + err) + } + } + if f.stores != nil { if err := f.stores.Close(); err != nil { log.Errorf("Error closing stores: %v", err) @@ -552,6 +570,21 @@ func (f *Faraday) initialize(withMacaroonService bool) error { return fmt.Errorf("could not create stores: %v", err) } + // Create the channel event monitor. + f.monitor = chanevents.NewMonitor( + f.lnd.Client, f.stores.ChanEventsStore, + ) + + ctx, cancel := context.WithCancel(context.Background()) + f.ctxCancel = cancel + + if err := f.monitor.Start(ctx); err != nil { + cancel() + + return fmt.Errorf("could not start channel event "+ + "monitor: %v", err) + } + return nil }