diff --git a/chanevents/chanevents.go b/chanevents/chanevents.go index 70ca21f..6359195 100644 --- a/chanevents/chanevents.go +++ b/chanevents/chanevents.go @@ -14,16 +14,16 @@ type EventType int16 const ( // EventTypeUnknown is the unknown event type. - EventTypeUnknown = 0 + EventTypeUnknown EventType = 0 // EventTypeOnline is the online event type. - EventTypeOnline = 1 + EventTypeOnline EventType = 1 // EventTypeOffline is the offline event type. - EventTypeOffline = 2 + EventTypeOffline EventType = 2 // EventTypeUpdate is the balance update event type. - EventTypeUpdate = 3 + EventTypeUpdate EventType = 3 ) // String returns the string representation of the event type. @@ -106,4 +106,8 @@ type ChannelEvent struct { // RemoteBalance is the remote balance of the channel at the time of the // event. This is only populated for balance update events. RemoteBalance fn.Option[btcutil.Amount] + + // IsSync indicates whether this event was recorded during an initial + // sync rather than from a live subscription. + IsSync bool } diff --git a/chanevents/log.go b/chanevents/log.go new file mode 100644 index 0000000..492d337 --- /dev/null +++ b/chanevents/log.go @@ -0,0 +1,25 @@ +package chanevents + +import ( + "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/build" +) + +const Subsystem = "CHEV" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/chanevents/monitor.go b/chanevents/monitor.go new file mode 100644 index 0000000..c83a65c --- /dev/null +++ b/chanevents/monitor.go @@ -0,0 +1,514 @@ +package chanevents + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/lndclient" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/routing/route" +) + +const ( + // retryInterval is the time to wait before retrying after a + // transient error or while waiting for lnd to become ready. + retryInterval = 5 * time.Second +) + +var ( + // errMonitorAlreadyStarted is returned when the monitor is already + // started. + errMonitorAlreadyStarted = errors.New("monitor already started") + + // errMonitorNotStarted is returned when the monitor is not started. + errMonitorNotStarted = errors.New("monitor not started") +) + +// Monitor is an active component that listens to LND channel events and records +// them in the database. +type Monitor struct { + started atomic.Bool + + // lnd is the lnd client that the monitor will use to subscribe to + // channel events. + lnd lndclient.LightningClient + + // store is the channel events store that the monitor will use to record + // channel events. + store *Store + + wg sync.WaitGroup + quit chan struct{} +} + +// NewMonitor creates a new channel events monitor. +func NewMonitor(lnd lndclient.LightningClient, store *Store) *Monitor { + return &Monitor{ + lnd: lnd, + store: store, + quit: make(chan struct{}), + } +} + +// Start starts the channel events monitor. +func (m *Monitor) Start(ctx context.Context) error { + if !m.started.CompareAndSwap(false, true) { + return errMonitorAlreadyStarted + } + + log.Info("Starting channel events monitor") + + m.quit = make(chan struct{}) + + m.wg.Add(1) + go m.monitorLoop(ctx) + + return nil +} + +// Stop stops the channel events monitor. +func (m *Monitor) Stop() error { + if !m.started.CompareAndSwap(true, false) { + return errMonitorNotStarted + } + + log.Info("Stopping channel events monitor") + + close(m.quit) + m.wg.Wait() + + return nil +} + +// monitorLoop is the main loop of the channel events monitor. It waits for lnd +// to be fully synced, performs an initial state sync, and then subscribes to +// channel events. If the subscription fails or the stream breaks, it retries +// from the beginning. +func (m *Monitor) monitorLoop(ctx context.Context) { + defer m.wg.Done() + + log.Info("Channel events monitor starting") + + var synced bool + + for { + // Wait for lnd to be synced to chain, retrying on RPC errors. + if !m.waitForReady(ctx) { + return + } + + // Initial state sync, only performed once. + if !synced { + if err := m.initialSync(ctx); err != nil { + log.Errorf("Error during initial sync: %v", err) + } else { + synced = true + } + } + + // Subscribe and consume events until the stream breaks or an + // error occurs. + if !m.subscribe(ctx) { + return + } + + // Stream broke, wait before reconnecting. + log.Infof("Reconnecting channel event subscription...") + + select { + case <-time.After(retryInterval): + case <-m.quit: + return + case <-ctx.Done(): + return + } + } +} + +// waitForReady polls lnd's GetInfo until it reports SyncedToChain. It retries +// on transient RPC errors. It returns true when lnd is ready, or false if the +// monitor is shutting down. +func (m *Monitor) waitForReady(ctx context.Context) bool { + for { + info, err := m.lnd.GetInfo(ctx) + if err != nil { + log.Warnf("Error getting lnd info, retrying: %v", err) + } else if info.SyncedToChain { + return true + } else { + log.Infof("Waiting for lnd to sync to chain...") + } + + select { + case <-time.After(retryInterval): + case <-m.quit: + return false + case <-ctx.Done(): + return false + } + } +} + +// subscribe subscribes to lnd channel events and processes them until the +// stream breaks or an error occurs. It returns true on transient failures +// (caller should retry) or false if the monitor is shutting down. +func (m *Monitor) subscribe(ctx context.Context) bool { + eventChan, errChan, err := m.lnd.SubscribeChannelEvents(ctx) + if err != nil { + log.Errorf("Error subscribing to channel events: %v", err) + + // Return true to signal the caller to retry. + return true + } + + for { + select { + case event, ok := <-eventChan: + if !ok { + log.Warn("Channel event stream closed") + return true + } + if err := m.handleChannelEvent(ctx, event); err != nil { + log.Errorf("Error handling channel event: %v", + err) + } + + case err, ok := <-errChan: + if !ok { + log.Warn("Channel event error stream " + + "closed") + + return true + } + log.Errorf("Error from channel event "+ + "subscription: %v", err) + + return true + + case <-m.quit: + log.Info("Channel events monitor stopping") + return false + + case <-ctx.Done(): + log.Info("Channel events monitor stopping") + return false + } + } +} + +// initialSync performs an initial sync of the channel state. It queries lnd for +// all known channels (open and closed) and records their current state in the +// database. This ensures that any channel events that occurred while faraday +// was offline are accounted for: even though individual events are lost, the +// latest state is captured as a baseline. Events recorded during initial sync +// are marked with IsSync=true to distinguish them from real-time events +// received via the subscription. +func (m *Monitor) initialSync(ctx context.Context) error { + log.Info("Performing initial sync of channel state") + + closedChannels, err := m.lnd.ClosedChannels(ctx) + if err != nil { + return fmt.Errorf("error listing closed channels: %w", err) + } + + for _, channel := range closedChannels { + // Abort if the context has been cancelled. + if ctx.Err() != nil { + return ctx.Err() + } + + // Channels that didn't confirm onchain will be present here, + // but don't have a channel ID. We skip those. + if channel.ChannelID == 0 { + log.Debugf("Skipping closed channel with no "+ + "channel ID: %s", channel.ChannelPoint) + + continue + } + + err := m.addChannel( + ctx, channel.PubKeyBytes, channel.ChannelPoint, + channel.ChannelID, + ) + + if err != nil { + log.Errorf("error adding closed channel %s: %v", + channel.ChannelPoint, err) + + continue + } + + dbChan, err := m.store.GetChannel(ctx, channel.ChannelPoint) + if err != nil { + log.Errorf("error getting closed channel %s from db: %v", + channel.ChannelPoint, err) + + continue + } + + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: EventTypeOffline, + IsSync: true, + }); err != nil { + log.Errorf("error adding offline event for closed "+ + "channel %s: %v", channel.ChannelPoint, err) + } + } + + channels, err := m.lnd.ListChannels(ctx, false, false) + if err != nil { + return fmt.Errorf("error listing channels: %w", err) + } + + for _, channel := range channels { + // Abort if the context has been cancelled. + if ctx.Err() != nil { + return ctx.Err() + } + + // We make sure the channel exists in the store. + err := m.addChannel( + ctx, channel.PubKeyBytes, channel.ChannelPoint, + channel.ChannelID, + ) + if err != nil { + log.Errorf("error adding channel %s: %v", + channel.ChannelPoint, err) + + continue + } + + dbChan, err := m.store.GetChannel(ctx, channel.ChannelPoint) + if err != nil { + log.Errorf("error getting channel %s from db: %v", + channel.ChannelPoint, err) + + continue + } + + eventType := EventTypeOffline + if channel.Active { + eventType = EventTypeOnline + } + + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: eventType, + IsSync: true, + }); err != nil { + log.Errorf("error adding event for channel %s: %v", + channel.ChannelPoint, err) + } + + // We add the update event separately from the online/offline + // event above, because each event type serves a different + // purpose: the online/offline event tracks channel + // availability, while the update event captures a balance + // snapshot. Keeping them as distinct records allows querying + // availability and balance history independently. + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: EventTypeUpdate, + LocalBalance: fn.Some(channel.LocalBalance), + RemoteBalance: fn.Some(channel.RemoteBalance), + IsSync: true, + }); err != nil { + log.Errorf("error adding event for channel %s: %v", + channel.ChannelPoint, err) + } + } + + return nil +} + +// addChannel adds a channel and its peer to the store. +func (m *Monitor) addChannel(ctx context.Context, pubKeyBytes route.Vertex, + channelPoint string, channelID uint64) error { + + // Check if the channel already exists. + channel, err := m.store.GetChannel(ctx, channelPoint) + if err != nil && !errors.Is(err, errUnknownChannel) { + return fmt.Errorf("error getting channel %s: %w", + channelPoint, err) + } + if channel != nil { + // Channel already exists, nothing to do. + return nil + } + + // Check if peer already exists. + peer, err := m.store.GetPeer(ctx, pubKeyBytes.String()) + if err != nil && !errors.Is(err, errUnknownPeer) { + return fmt.Errorf("error getting peer %s: %w", + pubKeyBytes, err) + } + + var peerID int64 + if peer != nil { + peerID = peer.ID + } else { + peerID, err = m.store.AddPeer( + ctx, pubKeyBytes.String(), + ) + if err != nil { + return fmt.Errorf("error adding peer %s: %w", + pubKeyBytes, err) + } + } + + _, err = m.store.AddChannel(ctx, channelPoint, channelID, peerID) + if err != nil { + return fmt.Errorf("error adding channel %s: %w", + channelPoint, err) + } + + log.Infof("Added channel %s to db", channelPoint) + + return nil +} + +// handleChannelEvent handles a single channel event. +func (m *Monitor) handleChannelEvent(ctx context.Context, + event *lndclient.ChannelEventUpdate) error { + + switch event.UpdateType { + case lndclient.OpenChannelUpdate: + openChannel := event.OpenedChannelInfo + if openChannel == nil { + return fmt.Errorf("open_channel event is nil") + } + + log.Debugf("Handling open channel event: %+v", openChannel) + + // We add the new channel to the store. + if err := m.addChannel( + ctx, openChannel.PubKeyBytes, openChannel.ChannelPoint, + openChannel.ChannelID, + ); err != nil { + return err + } + + // Now add the online and update events. + dbChan, err := m.store.GetChannel(ctx, openChannel.ChannelPoint) + if err != nil { + return err + } + + if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: dbChan.ID, + EventType: EventTypeOnline, + }); err != nil { + return err + } + + return m.addUpdateEvent(ctx, openChannel) + + case lndclient.ClosedChannelUpdate: + if event.ClosedChannelInfo == nil { + return fmt.Errorf("closed_channel event is nil") + } + + log.Debugf("Handling offline channel event: %+v", + event.ClosedChannelInfo) + + return m.addOfflineEvent(ctx, + event.ClosedChannelInfo.ChannelPoint) + + case lndclient.ActiveChannelUpdate: + log.Debugf("Handling active channel event: %v", + event.ChannelPoint) + + return m.addOnlineEvent(ctx, event.ChannelPoint.String()) + + case lndclient.InactiveChannelUpdate: + log.Debugf("Handling offline channel event: %v", + event.ChannelPoint) + + return m.addOfflineEvent(ctx, event.ChannelPoint.String()) + + case lndclient.PendingOpenChannelUpdate: + log.Debugf("Ignoring pending channel event: %v", + event.ChannelPoint) + + return nil + + case lndclient.StateChannelUpdate: + if event.UpdatedChannelInfo == nil { + return fmt.Errorf("state_update event is nil") + } + + log.Debugf("Handling channel update event: %+v", + event.UpdatedChannelInfo) + + return m.addUpdateEvent(ctx, event.UpdatedChannelInfo) + } + + return nil +} + +// addOnlineEvent adds an online event for a channel. +func (m *Monitor) addOnlineEvent(ctx context.Context, + channelPoint string) error { + + channel, err := m.store.GetChannel(ctx, channelPoint) + if err != nil { + return fmt.Errorf("error getting channel %s: %w", channelPoint, + err) + } + + log.Infof("Adding online event for channel %s", channelPoint) + + return m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: channel.ID, + EventType: EventTypeOnline, + }) +} + +// addOfflineEvent adds an offline event for a channel. +func (m *Monitor) addOfflineEvent(ctx context.Context, + channelPoint string) error { + + channel, err := m.store.GetChannel(ctx, channelPoint) + if err != nil { + return fmt.Errorf("error getting channel %s: %w", channelPoint, + err) + } + + log.Infof("Adding offline event for channel %s", channelPoint) + + return m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: channel.ID, + EventType: EventTypeOffline, + }) +} + +// addUpdateEvent adds an update event for a channel. +func (m *Monitor) addUpdateEvent(ctx context.Context, + channelInfo *lndclient.ChannelInfo) error { + + channel, err := m.store.GetChannel(ctx, channelInfo.ChannelPoint) + if err != nil { + return fmt.Errorf("error getting channel %s: %w", + channelInfo.ChannelPoint, err) + } + + log.Tracef("Adding update event for channel %s", + channelInfo.ChannelPoint) + + return m.store.AddChannelEvent(ctx, &ChannelEvent{ + ChannelID: channel.ID, + EventType: EventTypeUpdate, + LocalBalance: fn.Some( + btcutil.Amount(channelInfo.LocalBalance), + ), + RemoteBalance: fn.Some( + btcutil.Amount(channelInfo.RemoteBalance), + ), + }) +} diff --git a/chanevents/store.go b/chanevents/store.go index a3d202d..36f96d8 100644 --- a/chanevents/store.go +++ b/chanevents/store.go @@ -213,6 +213,7 @@ func (s *Store) AddChannelEvent(ctx context.Context, Timestamp: timestamp, LocalBalanceSat: localBalance, RemoteBalanceSat: remoteBalance, + IsSync: event.IsSync, }, ) if err != nil { @@ -269,5 +270,6 @@ func marshalChannelEvent(dbEvent sqlc.ChannelEvent) *ChannelEvent { Timestamp: dbEvent.Timestamp.UTC(), LocalBalance: localBalance, RemoteBalance: remoteBalance, + IsSync: dbEvent.IsSync, } } diff --git a/chanevents/store_test.go b/chanevents/store_test.go index c9bbaf2..3446c5f 100644 --- a/chanevents/store_test.go +++ b/chanevents/store_test.go @@ -22,6 +22,22 @@ var ( testTime = time.Unix(1, 0) ) +// requireEqualEvent asserts that a retrieved event matches the expected values, +// comparing only the fields that are set before insertion (ignoring the +// auto-assigned ID). +func requireEqualEvent(t *testing.T, expected *ChannelEvent, + expectedTime time.Time, actual *ChannelEvent) { + + t.Helper() + + require.Equal(t, expected.ChannelID, actual.ChannelID) + require.Equal(t, expected.EventType, actual.EventType) + require.Equal(t, expectedTime.Unix(), actual.Timestamp.Unix()) + require.Equal(t, expected.LocalBalance, actual.LocalBalance) + require.Equal(t, expected.RemoteBalance, actual.RemoteBalance) + require.Equal(t, expected.IsSync, actual.IsSync) +} + // TestStore tests the chanevents store. func TestStore(t *testing.T) { t.Parallel() @@ -114,15 +130,30 @@ func TestStore(t *testing.T) { require.NoError(t, err) require.Len(t, events, 2) - require.Equal(t, onlineEvent.EventType, events[0].EventType) - require.Equal(t, testTime.Unix(), events[0].Timestamp.Unix()) - require.True(t, events[0].LocalBalance.IsNone()) - require.True(t, events[0].RemoteBalance.IsNone()) + requireEqualEvent(t, onlineEvent, testTime, events[0]) + requireEqualEvent( + t, updateEvent, testTime.Add(time.Second), events[1], + ) + + // Advance the clock and add a sync event to verify the IsSync flag + // round-trips correctly. + clock.SetTime(testTime.Add(2 * time.Second)) + + syncEvent := &ChannelEvent{ + ChannelID: channelID, + EventType: EventTypeOnline, + IsSync: true, + } + err = store.AddChannelEvent(ctx, syncEvent) + require.NoError(t, err) + + events, err = store.GetChannelEvents( + ctx, channelID, time.Unix(0, 0), time.Unix(4, 0), + ) + require.NoError(t, err) + require.Len(t, events, 3) - require.Equal(t, updateEvent.EventType, events[1].EventType) - require.Equal( - t, testTime.Add(time.Second).Unix(), events[1].Timestamp.Unix(), + requireEqualEvent( + t, syncEvent, testTime.Add(2*time.Second), events[2], ) - require.Equal(t, updateEvent.LocalBalance, events[1].LocalBalance) - require.Equal(t, updateEvent.RemoteBalance, events[1].RemoteBalance) } diff --git a/config.go b/config.go index 0f35256..70713ee 100644 --- a/config.go +++ b/config.go @@ -11,11 +11,16 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/lightninglabs/faraday/chain" + "github.com/lightninglabs/faraday/chanevents" + "github.com/lightninglabs/faraday/db" + "github.com/lightninglabs/faraday/db/sqlc" "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/cert" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/sqldb/v2" "google.golang.org/grpc/credentials" ) @@ -35,6 +40,16 @@ const ( // certificate. The value corresponds to 14 months // (14 months * 30 days * 24 hours). defaultTLSCertDuration = 14 * 30 * 24 * time.Hour + + // DatabaseBackendSqlite is the name of the SQLite database backend. + DatabaseBackendSqlite = "sqlite" + + // DatabaseBackendPostgres is the name of the Postgres database backend. + DatabaseBackendPostgres = "postgres" + + // defaultSqliteDatabaseFileName is the default name of the SQLite + // database file. + defaultSqliteDatabaseFileName = "faraday.db" ) var ( @@ -158,6 +173,17 @@ type Config struct { //nolint:maligned // Logging controls various aspects of pool logging. Logging *build.LogConfig `group:"logging" namespace:"logging"` + + // DatabaseBackend is the database backend we will use for storing all + // liveness data. + DatabaseBackend string `long:"databasebackend" description:"The database backend to use for storing all liveness data." choice:"sqlite" choice:"postgres"` + + // Sqlite holds the configuration options for a SQLite database + // backend. + Sqlite *db.SqliteConfig `group:"sqlite" namespace:"sqlite"` + + // Postgres holds the configuration options for a Postgres database + Postgres *sqldb.PostgresConfig `group:"postgres" namespace:"postgres"` } // DefaultConfig returns all default values for the Config struct. @@ -179,6 +205,10 @@ func DefaultConfig() Config { ChainConn: defaultChainConn, Bitcoin: chain.DefaultConfig, Logging: build.DefaultLogConfig(), + DatabaseBackend: DatabaseBackendSqlite, + Sqlite: &db.SqliteConfig{ + DatabaseFileName: defaultSqliteDatabaseFileName, + }, } } @@ -396,3 +426,99 @@ func loadCertWithCreate(cfg *Config) (tls.Certificate, *x509.Certificate, return cert.LoadCert(cfg.TLSCertPath, cfg.TLSKeyPath) } + +// stores holds a collection of the DB stores that are used by faraday. +type stores struct { + // ChanEventsStore is used to watch for channel events. + ChanEventsStore *chanevents.Store + + // closeFns holds various callbacks that can be used to close any open + // stores in the stores struct. + closeFns map[string]func() error +} + +// NewStores creates a new stores instance based on the chosen database backend. +func NewStores(cfg Config, clock clock.Clock) (*stores, error) { + var ( + stores = &stores{ + closeFns: make(map[string]func() error), + } + ) + + switch cfg.DatabaseBackend { + case DatabaseBackendSqlite: + dbPath := filepath.Join( + cfg.FaradayDir, cfg.Sqlite.DatabaseFileName, + ) + + sqlStore, err := sqldb.NewSqliteStore(&sqldb.SqliteConfig{ + SkipMigrations: cfg.Sqlite.SkipMigrations, + SkipMigrationDbBackup: cfg.Sqlite.SkipMigrationDbBackup, + }, dbPath) + if err != nil { + return stores, err + } + + if !cfg.Sqlite.SkipMigrations { + err = sqldb.ApplyAllMigrations( + sqlStore, db.FaradayMigrationSets, + ) + if err != nil { + return stores, fmt.Errorf("error applying "+ + "migrations to SQLite store: %w", err, + ) + } + } + + queries := sqlc.NewForType(sqlStore, sqlStore.BackendType) + + stores.ChanEventsStore = chanevents.NewStore( + sqlStore.BaseDB, queries, clock, + ) + + stores.closeFns["sqlite"] = sqlStore.Close + + case DatabaseBackendPostgres: + sqlStore, err := sqldb.NewPostgresStore(cfg.Postgres) + if err != nil { + return stores, err + } + + if !cfg.Postgres.SkipMigrations { + err = sqldb.ApplyAllMigrations( + sqlStore, db.FaradayMigrationSets, + ) + if err != nil { + return stores, fmt.Errorf("error applying "+ + "migrations to Postgres store: %w", err, + ) + } + } + + queries := sqlc.NewForType(sqlStore, sqlStore.BackendType) + + stores.ChanEventsStore = chanevents.NewStore( + sqlStore.BaseDB, queries, clock, + ) + + stores.closeFns["postgres"] = sqlStore.Close + + default: + return nil, fmt.Errorf("unsupported database backend: "+ + "%s", cfg.DatabaseBackend) + } + + return stores, nil +} + +// Close closes all the stores. +func (s *stores) Close() error { + for name, closeFn := range s.closeFns { + err := closeFn() + if err != nil { + return fmt.Errorf("error closing %s store: %v", name, err) + } + } + + return nil +} diff --git a/db/sqlc/chanevents.sql.go b/db/sqlc/chanevents.sql.go index 9720995..5c7d5a5 100644 --- a/db/sqlc/chanevents.sql.go +++ b/db/sqlc/chanevents.sql.go @@ -44,7 +44,7 @@ func (q *Queries) GetChannelByShortChanID(ctx context.Context, shortChannelID in } const getChannelEvents = `-- name: GetChannelEvents :many -SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat FROM channel_events +SELECT id, channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, is_sync FROM channel_events WHERE channel_id = $1 AND timestamp >= $2 AND timestamp < $3 ORDER BY timestamp ASC, id ASC ` @@ -71,6 +71,7 @@ func (q *Queries) GetChannelEvents(ctx context.Context, arg GetChannelEventsPara &i.Timestamp, &i.LocalBalanceSat, &i.RemoteBalanceSat, + &i.IsSync, ); err != nil { return nil, err } @@ -115,8 +116,9 @@ func (q *Queries) InsertChannel(ctx context.Context, arg InsertChannelParams) (i const insertChannelEvent = `-- name: InsertChannelEvent :exec INSERT INTO channel_events ( - channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat -) VALUES ($1, $2, $3, $4, $5) + channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, + is_sync +) VALUES ($1, $2, $3, $4, $5, $6) ` type InsertChannelEventParams struct { @@ -125,6 +127,7 @@ type InsertChannelEventParams struct { Timestamp time.Time LocalBalanceSat sql.NullInt64 RemoteBalanceSat sql.NullInt64 + IsSync bool } func (q *Queries) InsertChannelEvent(ctx context.Context, arg InsertChannelEventParams) error { @@ -134,6 +137,7 @@ func (q *Queries) InsertChannelEvent(ctx context.Context, arg InsertChannelEvent arg.Timestamp, arg.LocalBalanceSat, arg.RemoteBalanceSat, + arg.IsSync, ) return err } diff --git a/db/sqlc/migrations/000001_chanevents.up.sql b/db/sqlc/migrations/000001_chanevents.up.sql index f733b59..c4cf35e 100644 --- a/db/sqlc/migrations/000001_chanevents.up.sql +++ b/db/sqlc/migrations/000001_chanevents.up.sql @@ -35,7 +35,10 @@ CREATE TABLE IF NOT EXISTS channel_events ( local_balance_sat BIGINT CHECK (local_balance_sat >= 0), -- The remote balance of the channel at the time of the event. -- This is only populated for balance update events. - remote_balance_sat BIGINT CHECK (remote_balance_sat >= 0) + remote_balance_sat BIGINT CHECK (remote_balance_sat >= 0), + -- Whether this event was recorded during an initial sync rather than + -- from a live subscription. + is_sync BOOLEAN NOT NULL DEFAULT FALSE ); -- This composite index is crucial for efficiently querying the event history diff --git a/db/sqlc/models.go b/db/sqlc/models.go index 1094533..fa20db0 100644 --- a/db/sqlc/models.go +++ b/db/sqlc/models.go @@ -23,6 +23,7 @@ type ChannelEvent struct { Timestamp time.Time LocalBalanceSat sql.NullInt64 RemoteBalanceSat sql.NullInt64 + IsSync bool } type Peer struct { diff --git a/db/sqlc/queries/chanevents.sql b/db/sqlc/queries/chanevents.sql index 5370e4e..c615421 100644 --- a/db/sqlc/queries/chanevents.sql +++ b/db/sqlc/queries/chanevents.sql @@ -15,8 +15,9 @@ SELECT * FROM channels WHERE short_channel_id = $1; -- name: InsertChannelEvent :exec INSERT INTO channel_events ( - channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat -) VALUES ($1, $2, $3, $4, $5); + channel_id, event_type, timestamp, local_balance_sat, remote_balance_sat, + is_sync +) VALUES ($1, $2, $3, $4, $5, $6); -- name: GetChannelEvents :many SELECT * FROM channel_events diff --git a/faraday.go b/faraday.go index e919bd5..0f15d38 100644 --- a/faraday.go +++ b/faraday.go @@ -18,11 +18,13 @@ import ( proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/jessevdk/go-flags" "github.com/lightninglabs/faraday/chain" + "github.com/lightninglabs/faraday/chanevents" "github.com/lightninglabs/faraday/frdrpc" "github.com/lightninglabs/faraday/frdrpcserver" "github.com/lightninglabs/faraday/frdrpcserver/perms" "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc/verrpc" @@ -89,6 +91,15 @@ type Faraday struct { // reuse of the struct, since internal fields are not reset. stopped atomic.Bool + // monitor is the channel events monitor. + monitor *chanevents.Monitor + + // stores contains all the stores used by faraday. + stores *stores + + // ctxCancel is a function that can be used to cancel the main context. + ctxCancel context.CancelFunc + lnd *lndclient.GrpcLndServices // lndOwned indicates whether Faraday created the lnd connection @@ -442,6 +453,23 @@ func (f *Faraday) Stop() error { // can complete cleanly. f.wg.Wait() + if f.ctxCancel != nil { + f.ctxCancel() + } + + if f.monitor != nil { + if err := f.monitor.Stop(); err != nil { + log.Errorf("Error stopping channel event monitor: %v", + err) + } + } + + if f.stores != nil { + if err := f.stores.Close(); err != nil { + log.Errorf("Error closing stores: %v", err) + } + } + var stopErr error if f.macaroonService != nil { err := f.macaroonService.Stop() @@ -536,6 +564,27 @@ func (f *Faraday) initialize(withMacaroonService bool) error { } } + // Create any relevant stores. + f.stores, err = NewStores(*f.cfg, clock.NewDefaultClock()) + if err != nil { + return fmt.Errorf("could not create stores: %v", err) + } + + // Create the channel event monitor. + f.monitor = chanevents.NewMonitor( + f.lnd.Client, f.stores.ChanEventsStore, + ) + + ctx, cancel := context.WithCancel(context.Background()) + f.ctxCancel = cancel + + if err := f.monitor.Start(ctx); err != nil { + cancel() + + return fmt.Errorf("could not start channel event "+ + "monitor: %v", err) + } + return nil } diff --git a/log.go b/log.go index 9edd23d..1821875 100644 --- a/log.go +++ b/log.go @@ -3,6 +3,7 @@ package faraday import ( "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/faraday/accounting" + "github.com/lightninglabs/faraday/chanevents" "github.com/lightninglabs/faraday/dataset" "github.com/lightninglabs/faraday/fiat" "github.com/lightninglabs/faraday/frdrpcserver" @@ -37,6 +38,7 @@ func SetupLoggers(root *build.SubLoggerManager, intercept signal.Interceptor) { addSubLogger(root, revenue.Subsystem, intercept, revenue.UseLogger) addSubLogger(root, fiat.Subsystem, intercept, fiat.UseLogger) addSubLogger(root, accounting.Subsystem, intercept, accounting.UseLogger) + addSubLogger(root, chanevents.Subsystem, intercept, chanevents.UseLogger) } // UseLogger uses a specified Logger to output package logging info.