-
Notifications
You must be signed in to change notification settings - Fork 3
chore: add tn_local extension with local storage schema #1339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package tn_local | ||
|
|
||
| const ( | ||
| // SchemaName is the PostgreSQL schema for local storage tables. | ||
| // Not prefixed with "ds_" so it's excluded from consensus block hashing. | ||
| SchemaName = "ext_tn_local" | ||
|
|
||
| // ServiceName is the JSON-RPC service name registered on the admin server. | ||
| ServiceName = "local" | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| package tn_local | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/trufnetwork/kwil-db/core/log" | ||
| "github.com/trufnetwork/kwil-db/node/types/sql" | ||
| ) | ||
|
|
||
| // LocalDB wraps a sql.DB and provides operations against the ext_tn_local schema. | ||
| type LocalDB struct { | ||
| db sql.DB | ||
| logger log.Logger | ||
| } | ||
|
|
||
| // NewLocalDB creates a new LocalDB. | ||
| func NewLocalDB(db sql.DB, logger log.Logger) *LocalDB { | ||
| return &LocalDB{db: db, logger: logger} | ||
| } | ||
|
|
||
| // SetupSchema creates the ext_tn_local schema and all tables within a single transaction. | ||
| func (l *LocalDB) SetupSchema(ctx context.Context) error { | ||
| l.logger.Info("setting up local storage schema") | ||
|
|
||
| tx, err := l.db.BeginTx(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("begin transaction: %w", err) | ||
| } | ||
| defer func() { _ = tx.Rollback(ctx) }() | ||
|
|
||
| if err := setupLocalSchema(ctx, tx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := tx.Commit(ctx); err != nil { | ||
| return fmt.Errorf("commit transaction: %w", err) | ||
| } | ||
|
|
||
| l.logger.Info("local storage schema setup complete") | ||
| return nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| package tn_local | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
| "github.com/trufnetwork/kwil-db/core/log" | ||
| "github.com/trufnetwork/kwil-db/node/types/sql" | ||
| ) | ||
|
|
||
| // Extension holds all state for the tn_local extension. | ||
| type Extension struct { | ||
| logger log.Logger | ||
| db sql.DB | ||
| localDB *LocalDB | ||
| isEnabled bool | ||
| } | ||
|
|
||
| var ( | ||
| extensionInstance *Extension | ||
| once sync.Once | ||
| ) | ||
|
|
||
| // GetExtension returns the singleton Extension instance. | ||
| // The returned pointer is stable — adminServerHook registers it as a Svc, | ||
| // so it must never be replaced. Use configure() to update fields in-place. | ||
| func GetExtension() *Extension { | ||
| once.Do(func() { | ||
| extensionInstance = &Extension{ | ||
| logger: log.New(log.WithLevel(log.LevelInfo)), | ||
| isEnabled: false, | ||
| } | ||
| }) | ||
| return extensionInstance | ||
| } | ||
|
|
||
| // configure updates the extension's internal state in-place. | ||
| // This preserves the pointer identity so that the admin server's registered | ||
| // Svc reference remains valid. Called during sequential startup before the | ||
| // server accepts requests. | ||
| func (e *Extension) configure(logger log.Logger, db sql.DB, localDB *LocalDB) { | ||
| e.logger = logger | ||
| e.db = db | ||
| e.localDB = localDB | ||
| e.isEnabled = true | ||
| } | ||
|
|
||
| // Close closes the extension's connection pool. | ||
| func (e *Extension) Close() { | ||
| if e.db != nil { | ||
| if wrapper, ok := e.db.(*PoolDBWrapper); ok { | ||
| wrapper.Close() | ||
| e.logger.Info("closed local storage connection pool") | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| package tn_local | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/jackc/pgx/v5" | ||
| "github.com/jackc/pgx/v5/pgxpool" | ||
| "github.com/trufnetwork/kwil-db/node/pg" | ||
| "github.com/trufnetwork/kwil-db/node/types/sql" | ||
| ) | ||
|
|
||
| // PoolDBWrapper wraps a pgxpool.Pool to implement the sql.DB interface | ||
| // with ReadWrite access for local storage operations. | ||
| type PoolDBWrapper struct { | ||
| Pool *pgxpool.Pool | ||
| } | ||
|
|
||
| var _ sql.DB = &PoolDBWrapper{} | ||
|
|
||
| // NewPoolDBWrapper creates a new wrapper around the pgxpool. | ||
| func NewPoolDBWrapper(pool *pgxpool.Pool) *PoolDBWrapper { | ||
| return &PoolDBWrapper{Pool: pool} | ||
| } | ||
|
|
||
| // Execute implements sql.Executor. | ||
| func (w *PoolDBWrapper) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error) { | ||
| conn, err := w.Pool.Acquire(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("acquire connection: %w", err) | ||
| } | ||
| defer conn.Release() | ||
|
|
||
| rows, err := conn.Query(ctx, stmt, args...) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("execute query: %w", err) | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| oidToDataType := pg.OidTypesMap(conn.Conn().TypeMap()) | ||
|
|
||
| fields := rows.FieldDescriptions() | ||
| columns := make([]string, len(fields)) | ||
| oids := make([]uint32, len(fields)) | ||
| for i, field := range fields { | ||
| columns[i] = string(field.Name) | ||
| oids[i] = field.DataTypeOID | ||
| } | ||
|
|
||
| var resultRows [][]any | ||
| for rows.Next() { | ||
| values, err := rows.Values() | ||
| if err != nil { | ||
| return nil, fmt.Errorf("scan row values: %w", err) | ||
| } | ||
| decodedValues, err := pg.DecodeFromPG(values, oids, oidToDataType) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("decode values: %w", err) | ||
| } | ||
| resultRows = append(resultRows, decodedValues) | ||
| } | ||
|
|
||
| if err := rows.Err(); err != nil { | ||
| return nil, fmt.Errorf("rows iteration: %w", err) | ||
| } | ||
|
|
||
| return &sql.ResultSet{ | ||
| Columns: columns, | ||
| Rows: resultRows, | ||
| Status: sql.CommandTag{ | ||
| Text: rows.CommandTag().String(), | ||
| RowsAffected: rows.CommandTag().RowsAffected(), | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| // BeginTx implements sql.TxMaker. | ||
| func (w *PoolDBWrapper) BeginTx(ctx context.Context) (sql.Tx, error) { | ||
| pgxTx, err := w.Pool.Begin(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("begin transaction: %w", err) | ||
| } | ||
| return &poolTxWrapper{tx: pgxTx}, nil | ||
| } | ||
|
|
||
| // Close closes the underlying connection pool. | ||
| func (w *PoolDBWrapper) Close() { | ||
| w.Pool.Close() | ||
| } | ||
|
|
||
| // poolTxWrapper wraps a pgx.Tx to implement the sql.Tx interface. | ||
| type poolTxWrapper struct { | ||
| tx pgx.Tx | ||
| } | ||
|
|
||
| var _ sql.Tx = &poolTxWrapper{} | ||
|
|
||
| // Execute implements sql.Executor for transactions. | ||
| func (t *poolTxWrapper) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error) { | ||
| rows, err := t.tx.Query(ctx, stmt, args...) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("execute query in tx: %w", err) | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| oidToDataType := pg.OidTypesMap(t.tx.Conn().TypeMap()) | ||
|
|
||
| fields := rows.FieldDescriptions() | ||
| columns := make([]string, len(fields)) | ||
| oids := make([]uint32, len(fields)) | ||
| for i, field := range fields { | ||
| columns[i] = string(field.Name) | ||
| oids[i] = field.DataTypeOID | ||
| } | ||
|
|
||
| var resultRows [][]any | ||
| for rows.Next() { | ||
| values, err := rows.Values() | ||
| if err != nil { | ||
| return nil, fmt.Errorf("scan row values: %w", err) | ||
| } | ||
| decodedValues, err := pg.DecodeFromPG(values, oids, oidToDataType) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("decode values: %w", err) | ||
| } | ||
| resultRows = append(resultRows, decodedValues) | ||
| } | ||
|
|
||
| if err := rows.Err(); err != nil { | ||
| return nil, fmt.Errorf("rows iteration: %w", err) | ||
| } | ||
|
|
||
| return &sql.ResultSet{ | ||
| Columns: columns, | ||
| Rows: resultRows, | ||
| Status: sql.CommandTag{ | ||
| Text: rows.CommandTag().String(), | ||
| RowsAffected: rows.CommandTag().RowsAffected(), | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| // BeginTx implements sql.TxMaker for nested transactions (savepoints). | ||
| func (t *poolTxWrapper) BeginTx(ctx context.Context) (sql.Tx, error) { | ||
| nestedTx, err := t.tx.Begin(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("begin nested transaction: %w", err) | ||
| } | ||
| return &poolTxWrapper{tx: nestedTx}, nil | ||
| } | ||
|
|
||
| // Rollback implements sql.Tx. | ||
| func (t *poolTxWrapper) Rollback(ctx context.Context) error { | ||
| return t.tx.Rollback(ctx) | ||
| } | ||
|
|
||
| // Commit implements sql.Tx. | ||
| func (t *poolTxWrapper) Commit(ctx context.Context) error { | ||
| return t.tx.Commit(ctx) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| package tn_local | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/trufnetwork/kwil-db/node/types/sql" | ||
| ) | ||
|
|
||
| // setupLocalSchema creates the ext_tn_local schema and all tables. | ||
| // Tables mirror the normalized consensus schema (post migration 017). | ||
| func setupLocalSchema(ctx context.Context, tx sql.Tx) error { | ||
| if _, err := tx.Execute(ctx, `CREATE SCHEMA IF NOT EXISTS `+SchemaName); err != nil { | ||
| return fmt.Errorf("create schema: %w", err) | ||
| } | ||
|
|
||
| if err := ensureStreamsTable(ctx, tx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := ensurePrimitiveEventsTable(ctx, tx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := ensureTaxonomiesTable(ctx, tx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // ensureStreamsTable creates the streams table. | ||
| // Mirrors: consensus streams table after 017-normalize-tables.sql | ||
| func ensureStreamsTable(ctx context.Context, tx sql.Tx) error { | ||
| createTable := fmt.Sprintf(` | ||
| CREATE TABLE IF NOT EXISTS %s.streams ( | ||
| id SERIAL PRIMARY KEY, | ||
| data_provider TEXT NOT NULL, | ||
| stream_id TEXT NOT NULL, | ||
| stream_type TEXT NOT NULL, | ||
| created_at INT8 NOT NULL, | ||
| UNIQUE(data_provider, stream_id), | ||
| CHECK (stream_type IN ('primitive', 'composed')) | ||
| )`, SchemaName) | ||
|
|
||
| if _, err := tx.Execute(ctx, createTable); err != nil { | ||
| return fmt.Errorf("create streams table: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // ensurePrimitiveEventsTable creates the primitive_events table with an index. | ||
| // Mirrors: consensus primitive_events after 017-normalize-tables.sql | ||
| func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { | ||
| createTable := fmt.Sprintf(` | ||
| CREATE TABLE IF NOT EXISTS %s.primitive_events ( | ||
| stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE, | ||
| event_time INT8 NOT NULL, | ||
| value NUMERIC(36,18) NOT NULL, | ||
| created_at INT8 NOT NULL DEFAULT 0, | ||
| PRIMARY KEY (stream_ref, event_time, created_at) | ||
| )`, SchemaName, SchemaName) | ||
|
|
||
| if _, err := tx.Execute(ctx, createTable); err != nil { | ||
| return fmt.Errorf("create primitive_events table: %w", err) | ||
| } | ||
|
|
||
| createIndex := fmt.Sprintf(` | ||
| CREATE INDEX IF NOT EXISTS local_pe_stream_time_idx | ||
| ON %s.primitive_events (stream_ref, event_time)`, SchemaName) | ||
|
|
||
| if _, err := tx.Execute(ctx, createIndex); err != nil { | ||
| return fmt.Errorf("create primitive_events index: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // ensureTaxonomiesTable creates the taxonomies table. | ||
| // Mirrors: consensus taxonomies after 017-normalize-tables.sql | ||
| // Local composed streams can ONLY reference other local streams. | ||
| func ensureTaxonomiesTable(ctx context.Context, tx sql.Tx) error { | ||
| createTable := fmt.Sprintf(` | ||
| CREATE TABLE IF NOT EXISTS %s.taxonomies ( | ||
| taxonomy_id UUID PRIMARY KEY, | ||
| stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE, | ||
| child_stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE, | ||
| weight NUMERIC(36,18) NOT NULL, | ||
| start_time INT8 NOT NULL, | ||
| group_sequence INT NOT NULL DEFAULT 0, | ||
| created_at INT8 NOT NULL, | ||
| disabled_at INT8, | ||
| CHECK (weight >= 0), | ||
| CHECK (group_sequence >= 0), | ||
| CHECK (start_time >= 0) | ||
| )`, SchemaName, SchemaName, SchemaName) | ||
|
|
||
| if _, err := tx.Execute(ctx, createTable); err != nil { | ||
| return fmt.Errorf("create taxonomies table: %w", err) | ||
| } | ||
| return nil | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.