diff --git a/extensions/register.go b/extensions/register.go index b5a30c0d..be7cf4e1 100644 --- a/extensions/register.go +++ b/extensions/register.go @@ -6,6 +6,7 @@ import ( "github.com/trufnetwork/node/extensions/tn_attestation" "github.com/trufnetwork/node/extensions/tn_cache" "github.com/trufnetwork/node/extensions/tn_digest" + "github.com/trufnetwork/node/extensions/tn_local" "github.com/trufnetwork/node/extensions/tn_lp_rewards" "github.com/trufnetwork/node/extensions/tn_settlement" "github.com/trufnetwork/node/extensions/tn_vacuum" @@ -19,6 +20,7 @@ func init() { tn_digest.InitializeExtension() tn_settlement.InitializeExtension() tn_lp_rewards.InitializeExtension() + tn_local.InitializeExtension() tn_vacuum.InitializeExtension() tn_attestation.InitializeExtension() database_size.InitializeExtension() diff --git a/extensions/tn_local/constants.go b/extensions/tn_local/constants.go new file mode 100644 index 00000000..842f1934 --- /dev/null +++ b/extensions/tn_local/constants.go @@ -0,0 +1,10 @@ +package tn_local + +const ( + // SchemaName is the PostgreSQL schema for local storage tables. + // Not prefixed with "ds_" so it's excluded from consensus block hashing. + SchemaName = "ext_tn_local" + + // ServiceName is the JSON-RPC service name registered on the admin server. + ServiceName = "local" +) diff --git a/extensions/tn_local/db_ops.go b/extensions/tn_local/db_ops.go new file mode 100644 index 00000000..90455061 --- /dev/null +++ b/extensions/tn_local/db_ops.go @@ -0,0 +1,42 @@ +package tn_local + +import ( + "context" + "fmt" + + "github.com/trufnetwork/kwil-db/core/log" + "github.com/trufnetwork/kwil-db/node/types/sql" +) + +// LocalDB wraps a sql.DB and provides operations against the ext_tn_local schema. +type LocalDB struct { + db sql.DB + logger log.Logger +} + +// NewLocalDB creates a new LocalDB. +func NewLocalDB(db sql.DB, logger log.Logger) *LocalDB { + return &LocalDB{db: db, logger: logger} +} + +// SetupSchema creates the ext_tn_local schema and all tables within a single transaction. +func (l *LocalDB) SetupSchema(ctx context.Context) error { + l.logger.Info("setting up local storage schema") + + tx, err := l.db.BeginTx(ctx) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + if err := setupLocalSchema(ctx, tx); err != nil { + return err + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + l.logger.Info("local storage schema setup complete") + return nil +} diff --git a/extensions/tn_local/extension.go b/extensions/tn_local/extension.go new file mode 100644 index 00000000..97f425c8 --- /dev/null +++ b/extensions/tn_local/extension.go @@ -0,0 +1,55 @@ +package tn_local + +import ( + "sync" + + "github.com/trufnetwork/kwil-db/core/log" + "github.com/trufnetwork/kwil-db/node/types/sql" +) + +// Extension holds all state for the tn_local extension. +type Extension struct { + logger log.Logger + db sql.DB + localDB *LocalDB + isEnabled bool +} + +var ( + extensionInstance *Extension + once sync.Once +) + +// GetExtension returns the singleton Extension instance. +// The returned pointer is stable — adminServerHook registers it as a Svc, +// so it must never be replaced. Use configure() to update fields in-place. +func GetExtension() *Extension { + once.Do(func() { + extensionInstance = &Extension{ + logger: log.New(log.WithLevel(log.LevelInfo)), + isEnabled: false, + } + }) + return extensionInstance +} + +// configure updates the extension's internal state in-place. +// This preserves the pointer identity so that the admin server's registered +// Svc reference remains valid. Called during sequential startup before the +// server accepts requests. +func (e *Extension) configure(logger log.Logger, db sql.DB, localDB *LocalDB) { + e.logger = logger + e.db = db + e.localDB = localDB + e.isEnabled = true +} + +// Close closes the extension's connection pool. +func (e *Extension) Close() { + if e.db != nil { + if wrapper, ok := e.db.(*PoolDBWrapper); ok { + wrapper.Close() + e.logger.Info("closed local storage connection pool") + } + } +} diff --git a/extensions/tn_local/pool_wrapper.go b/extensions/tn_local/pool_wrapper.go new file mode 100644 index 00000000..362d5f9a --- /dev/null +++ b/extensions/tn_local/pool_wrapper.go @@ -0,0 +1,160 @@ +package tn_local + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/trufnetwork/kwil-db/node/pg" + "github.com/trufnetwork/kwil-db/node/types/sql" +) + +// PoolDBWrapper wraps a pgxpool.Pool to implement the sql.DB interface +// with ReadWrite access for local storage operations. +type PoolDBWrapper struct { + Pool *pgxpool.Pool +} + +var _ sql.DB = &PoolDBWrapper{} + +// NewPoolDBWrapper creates a new wrapper around the pgxpool. +func NewPoolDBWrapper(pool *pgxpool.Pool) *PoolDBWrapper { + return &PoolDBWrapper{Pool: pool} +} + +// Execute implements sql.Executor. +func (w *PoolDBWrapper) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error) { + conn, err := w.Pool.Acquire(ctx) + if err != nil { + return nil, fmt.Errorf("acquire connection: %w", err) + } + defer conn.Release() + + rows, err := conn.Query(ctx, stmt, args...) + if err != nil { + return nil, fmt.Errorf("execute query: %w", err) + } + defer rows.Close() + + oidToDataType := pg.OidTypesMap(conn.Conn().TypeMap()) + + fields := rows.FieldDescriptions() + columns := make([]string, len(fields)) + oids := make([]uint32, len(fields)) + for i, field := range fields { + columns[i] = string(field.Name) + oids[i] = field.DataTypeOID + } + + var resultRows [][]any + for rows.Next() { + values, err := rows.Values() + if err != nil { + return nil, fmt.Errorf("scan row values: %w", err) + } + decodedValues, err := pg.DecodeFromPG(values, oids, oidToDataType) + if err != nil { + return nil, fmt.Errorf("decode values: %w", err) + } + resultRows = append(resultRows, decodedValues) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("rows iteration: %w", err) + } + + return &sql.ResultSet{ + Columns: columns, + Rows: resultRows, + Status: sql.CommandTag{ + Text: rows.CommandTag().String(), + RowsAffected: rows.CommandTag().RowsAffected(), + }, + }, nil +} + +// BeginTx implements sql.TxMaker. +func (w *PoolDBWrapper) BeginTx(ctx context.Context) (sql.Tx, error) { + pgxTx, err := w.Pool.Begin(ctx) + if err != nil { + return nil, fmt.Errorf("begin transaction: %w", err) + } + return &poolTxWrapper{tx: pgxTx}, nil +} + +// Close closes the underlying connection pool. +func (w *PoolDBWrapper) Close() { + w.Pool.Close() +} + +// poolTxWrapper wraps a pgx.Tx to implement the sql.Tx interface. +type poolTxWrapper struct { + tx pgx.Tx +} + +var _ sql.Tx = &poolTxWrapper{} + +// Execute implements sql.Executor for transactions. +func (t *poolTxWrapper) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error) { + rows, err := t.tx.Query(ctx, stmt, args...) + if err != nil { + return nil, fmt.Errorf("execute query in tx: %w", err) + } + defer rows.Close() + + oidToDataType := pg.OidTypesMap(t.tx.Conn().TypeMap()) + + fields := rows.FieldDescriptions() + columns := make([]string, len(fields)) + oids := make([]uint32, len(fields)) + for i, field := range fields { + columns[i] = string(field.Name) + oids[i] = field.DataTypeOID + } + + var resultRows [][]any + for rows.Next() { + values, err := rows.Values() + if err != nil { + return nil, fmt.Errorf("scan row values: %w", err) + } + decodedValues, err := pg.DecodeFromPG(values, oids, oidToDataType) + if err != nil { + return nil, fmt.Errorf("decode values: %w", err) + } + resultRows = append(resultRows, decodedValues) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("rows iteration: %w", err) + } + + return &sql.ResultSet{ + Columns: columns, + Rows: resultRows, + Status: sql.CommandTag{ + Text: rows.CommandTag().String(), + RowsAffected: rows.CommandTag().RowsAffected(), + }, + }, nil +} + +// BeginTx implements sql.TxMaker for nested transactions (savepoints). +func (t *poolTxWrapper) BeginTx(ctx context.Context) (sql.Tx, error) { + nestedTx, err := t.tx.Begin(ctx) + if err != nil { + return nil, fmt.Errorf("begin nested transaction: %w", err) + } + return &poolTxWrapper{tx: nestedTx}, nil +} + +// Rollback implements sql.Tx. +func (t *poolTxWrapper) Rollback(ctx context.Context) error { + return t.tx.Rollback(ctx) +} + +// Commit implements sql.Tx. +func (t *poolTxWrapper) Commit(ctx context.Context) error { + return t.tx.Commit(ctx) +} diff --git a/extensions/tn_local/schema.go b/extensions/tn_local/schema.go new file mode 100644 index 00000000..d043a277 --- /dev/null +++ b/extensions/tn_local/schema.go @@ -0,0 +1,101 @@ +package tn_local + +import ( + "context" + "fmt" + + "github.com/trufnetwork/kwil-db/node/types/sql" +) + +// setupLocalSchema creates the ext_tn_local schema and all tables. +// Tables mirror the normalized consensus schema (post migration 017). +func setupLocalSchema(ctx context.Context, tx sql.Tx) error { + if _, err := tx.Execute(ctx, `CREATE SCHEMA IF NOT EXISTS `+SchemaName); err != nil { + return fmt.Errorf("create schema: %w", err) + } + + if err := ensureStreamsTable(ctx, tx); err != nil { + return err + } + + if err := ensurePrimitiveEventsTable(ctx, tx); err != nil { + return err + } + + if err := ensureTaxonomiesTable(ctx, tx); err != nil { + return err + } + + return nil +} + +// ensureStreamsTable creates the streams table. +// Mirrors: consensus streams table after 017-normalize-tables.sql +func ensureStreamsTable(ctx context.Context, tx sql.Tx) error { + createTable := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.streams ( + id SERIAL PRIMARY KEY, + data_provider TEXT NOT NULL, + stream_id TEXT NOT NULL, + stream_type TEXT NOT NULL, + created_at INT8 NOT NULL, + UNIQUE(data_provider, stream_id), + CHECK (stream_type IN ('primitive', 'composed')) + )`, SchemaName) + + if _, err := tx.Execute(ctx, createTable); err != nil { + return fmt.Errorf("create streams table: %w", err) + } + return nil +} + +// ensurePrimitiveEventsTable creates the primitive_events table with an index. +// Mirrors: consensus primitive_events after 017-normalize-tables.sql +func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { + createTable := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.primitive_events ( + stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE, + event_time INT8 NOT NULL, + value NUMERIC(36,18) NOT NULL, + created_at INT8 NOT NULL DEFAULT 0, + PRIMARY KEY (stream_ref, event_time, created_at) + )`, SchemaName, SchemaName) + + if _, err := tx.Execute(ctx, createTable); err != nil { + return fmt.Errorf("create primitive_events table: %w", err) + } + + createIndex := fmt.Sprintf(` + CREATE INDEX IF NOT EXISTS local_pe_stream_time_idx + ON %s.primitive_events (stream_ref, event_time)`, SchemaName) + + if _, err := tx.Execute(ctx, createIndex); err != nil { + return fmt.Errorf("create primitive_events index: %w", err) + } + return nil +} + +// ensureTaxonomiesTable creates the taxonomies table. +// Mirrors: consensus taxonomies after 017-normalize-tables.sql +// Local composed streams can ONLY reference other local streams. +func ensureTaxonomiesTable(ctx context.Context, tx sql.Tx) error { + createTable := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.taxonomies ( + taxonomy_id UUID PRIMARY KEY, + stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE, + child_stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE, + weight NUMERIC(36,18) NOT NULL, + start_time INT8 NOT NULL, + group_sequence INT NOT NULL DEFAULT 0, + created_at INT8 NOT NULL, + disabled_at INT8, + CHECK (weight >= 0), + CHECK (group_sequence >= 0), + CHECK (start_time >= 0) + )`, SchemaName, SchemaName, SchemaName) + + if _, err := tx.Execute(ctx, createTable); err != nil { + return fmt.Errorf("create taxonomies table: %w", err) + } + return nil +} diff --git a/extensions/tn_local/test_init.go b/extensions/tn_local/test_init.go new file mode 100644 index 00000000..58813a1d --- /dev/null +++ b/extensions/tn_local/test_init.go @@ -0,0 +1,19 @@ +package tn_local + +import ( + "github.com/trufnetwork/kwil-db/node/types/sql" +) + +// testOverrides holds test-injected state. +var testOverrides struct { + sqlDB sql.DB +} + +// SetTestDB allows tests to inject a database connection. +func SetTestDB(db sql.DB) { + testOverrides.sqlDB = db +} + +func getTestDB() sql.DB { + return testOverrides.sqlDB +} diff --git a/go.mod b/go.mod index e6a14dc1..ffc3c310 100644 --- a/go.mod +++ b/go.mod @@ -17,8 +17,8 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.37.0 - github.com/trufnetwork/kwil-db v0.10.3-0.20260313180311-7a6e0f2b5210 - github.com/trufnetwork/kwil-db/core v0.4.3-0.20260313180311-7a6e0f2b5210 + github.com/trufnetwork/kwil-db v0.10.3-0.20260316174721-ee9a05e5fbc3 + github.com/trufnetwork/kwil-db/core v0.4.3-0.20260316174721-ee9a05e5fbc3 github.com/trufnetwork/sdk-go v0.6.4-0.20260224122406-a741343e2f37 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa diff --git a/go.sum b/go.sum index f60adb94..15313ec2 100644 --- a/go.sum +++ b/go.sum @@ -1258,6 +1258,10 @@ github.com/trufnetwork/kwil-db v0.10.3-0.20260313145346-c0e3bc503fed h1:heQTJ8YU github.com/trufnetwork/kwil-db v0.10.3-0.20260313145346-c0e3bc503fed/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db v0.10.3-0.20260313180311-7a6e0f2b5210 h1:yQc84+PfvYeBymbLNwlxzb822YHQD6X2ApYrVqDGDWM= github.com/trufnetwork/kwil-db v0.10.3-0.20260313180311-7a6e0f2b5210/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20260316155040-d7d44362ca9e h1:k12pRfGvtBm0GIpZIe4ChzS4rTMBHaWVtRRms3ZkanE= +github.com/trufnetwork/kwil-db v0.10.3-0.20260316155040-d7d44362ca9e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20260316174721-ee9a05e5fbc3 h1:rcuy+35bKVSrV9gqjs17XzJUbDGZekKAW085Poijv14= +github.com/trufnetwork/kwil-db v0.10.3-0.20260316174721-ee9a05e5fbc3/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260216231327-01b863886682 h1:iaxXr8D3dU79MBhmS/uCuBhnlc+gbLvCvV6GtAz3ukw= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260216231327-01b863886682/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260303100144-0119418a1a7c h1:O5pyUJqZNNIi/l1vXc9fxycdEU9OxF5z8UQprTn4zZE= @@ -1276,6 +1280,10 @@ github.com/trufnetwork/kwil-db/core v0.4.3-0.20260313145346-c0e3bc503fed h1:DLWR github.com/trufnetwork/kwil-db/core v0.4.3-0.20260313145346-c0e3bc503fed/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260313180311-7a6e0f2b5210 h1:KHYby16InQCDzTACUQ53yeBWk03X87naIkvBAVVBE3Y= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260313180311-7a6e0f2b5210/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260316155040-d7d44362ca9e h1:GbYY+Q5S5DzV12Hss0SDNHwehdnNI5COlUHGXHdKhxU= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260316155040-d7d44362ca9e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260316174721-ee9a05e5fbc3 h1:6ryDcHZlqhmiHdhQul3YvxZWLfqqSTiTQrW5/kDsNsM= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260316174721-ee9a05e5fbc3/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.6.4-0.20260224122406-a741343e2f37 h1:VD/GWxLTshaXpLukEc1SXbG7QA9HrFzF8JvxJAJ/x7Q=