Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"github.com/trufnetwork/node/extensions/tn_attestation"
"github.com/trufnetwork/node/extensions/tn_cache"
"github.com/trufnetwork/node/extensions/tn_digest"
"github.com/trufnetwork/node/extensions/tn_local"
"github.com/trufnetwork/node/extensions/tn_lp_rewards"
"github.com/trufnetwork/node/extensions/tn_settlement"
"github.com/trufnetwork/node/extensions/tn_vacuum"
Expand All @@ -19,6 +20,7 @@
tn_digest.InitializeExtension()
tn_settlement.InitializeExtension()
tn_lp_rewards.InitializeExtension()
tn_local.InitializeExtension()

Check failure on line 23 in extensions/register.go

View workflow job for this annotation

GitHub Actions / lint

undefined: tn_local.InitializeExtension (typecheck)

Check failure on line 23 in extensions/register.go

View workflow job for this annotation

GitHub Actions / acceptance-test

undefined: tn_local.InitializeExtension
tn_vacuum.InitializeExtension()
tn_attestation.InitializeExtension()
database_size.InitializeExtension()
Expand Down
10 changes: 10 additions & 0 deletions extensions/tn_local/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tn_local

const (
// SchemaName is the PostgreSQL schema for local storage tables.
// Not prefixed with "ds_" so it's excluded from consensus block hashing.
SchemaName = "ext_tn_local"

// ServiceName is the JSON-RPC service name registered on the admin server.
ServiceName = "local"
)
42 changes: 42 additions & 0 deletions extensions/tn_local/db_ops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package tn_local

import (
"context"
"fmt"

"github.com/trufnetwork/kwil-db/core/log"
"github.com/trufnetwork/kwil-db/node/types/sql"
)

// LocalDB wraps a sql.DB and provides operations against the ext_tn_local schema.
type LocalDB struct {
db sql.DB
logger log.Logger
}

// NewLocalDB creates a new LocalDB.
func NewLocalDB(db sql.DB, logger log.Logger) *LocalDB {
return &LocalDB{db: db, logger: logger}
}

// SetupSchema creates the ext_tn_local schema and all tables within a single transaction.
func (l *LocalDB) SetupSchema(ctx context.Context) error {
l.logger.Info("setting up local storage schema")

tx, err := l.db.BeginTx(ctx)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()

if err := setupLocalSchema(ctx, tx); err != nil {
return err
}

if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}

l.logger.Info("local storage schema setup complete")
return nil
}
55 changes: 55 additions & 0 deletions extensions/tn_local/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package tn_local

import (
"sync"

"github.com/trufnetwork/kwil-db/core/log"
"github.com/trufnetwork/kwil-db/node/types/sql"
)

// Extension holds all state for the tn_local extension.
type Extension struct {
logger log.Logger
db sql.DB
localDB *LocalDB
isEnabled bool
}

var (
extensionInstance *Extension
once sync.Once
)

// GetExtension returns the singleton Extension instance.
// The returned pointer is stable — adminServerHook registers it as a Svc,
// so it must never be replaced. Use configure() to update fields in-place.
func GetExtension() *Extension {
once.Do(func() {
extensionInstance = &Extension{
logger: log.New(log.WithLevel(log.LevelInfo)),
isEnabled: false,
}
})
return extensionInstance
}

// configure updates the extension's internal state in-place.
// This preserves the pointer identity so that the admin server's registered
// Svc reference remains valid. Called during sequential startup before the
// server accepts requests.
func (e *Extension) configure(logger log.Logger, db sql.DB, localDB *LocalDB) {
e.logger = logger
e.db = db
e.localDB = localDB
e.isEnabled = true
}

// Close closes the extension's connection pool.
func (e *Extension) Close() {
if e.db != nil {
if wrapper, ok := e.db.(*PoolDBWrapper); ok {
wrapper.Close()
e.logger.Info("closed local storage connection pool")
}
}
}
160 changes: 160 additions & 0 deletions extensions/tn_local/pool_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package tn_local

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/trufnetwork/kwil-db/node/pg"
"github.com/trufnetwork/kwil-db/node/types/sql"
)

// PoolDBWrapper wraps a pgxpool.Pool to implement the sql.DB interface
// with ReadWrite access for local storage operations.
type PoolDBWrapper struct {
Pool *pgxpool.Pool
}

var _ sql.DB = &PoolDBWrapper{}

// NewPoolDBWrapper creates a new wrapper around the pgxpool.
func NewPoolDBWrapper(pool *pgxpool.Pool) *PoolDBWrapper {
return &PoolDBWrapper{Pool: pool}
}

// Execute implements sql.Executor.
func (w *PoolDBWrapper) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error) {
conn, err := w.Pool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("acquire connection: %w", err)
}
defer conn.Release()

rows, err := conn.Query(ctx, stmt, args...)
if err != nil {
return nil, fmt.Errorf("execute query: %w", err)
}
defer rows.Close()

oidToDataType := pg.OidTypesMap(conn.Conn().TypeMap())

fields := rows.FieldDescriptions()
columns := make([]string, len(fields))
oids := make([]uint32, len(fields))
for i, field := range fields {
columns[i] = string(field.Name)
oids[i] = field.DataTypeOID
}

var resultRows [][]any
for rows.Next() {
values, err := rows.Values()
if err != nil {
return nil, fmt.Errorf("scan row values: %w", err)
}
decodedValues, err := pg.DecodeFromPG(values, oids, oidToDataType)
if err != nil {
return nil, fmt.Errorf("decode values: %w", err)
}
resultRows = append(resultRows, decodedValues)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration: %w", err)
}

return &sql.ResultSet{
Columns: columns,
Rows: resultRows,
Status: sql.CommandTag{
Text: rows.CommandTag().String(),
RowsAffected: rows.CommandTag().RowsAffected(),
},
}, nil
}

// BeginTx implements sql.TxMaker.
func (w *PoolDBWrapper) BeginTx(ctx context.Context) (sql.Tx, error) {
pgxTx, err := w.Pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin transaction: %w", err)
}
return &poolTxWrapper{tx: pgxTx}, nil
}

// Close closes the underlying connection pool.
func (w *PoolDBWrapper) Close() {
w.Pool.Close()
}

// poolTxWrapper wraps a pgx.Tx to implement the sql.Tx interface.
type poolTxWrapper struct {
tx pgx.Tx
}

var _ sql.Tx = &poolTxWrapper{}

// Execute implements sql.Executor for transactions.
func (t *poolTxWrapper) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error) {
rows, err := t.tx.Query(ctx, stmt, args...)
if err != nil {
return nil, fmt.Errorf("execute query in tx: %w", err)
}
defer rows.Close()

oidToDataType := pg.OidTypesMap(t.tx.Conn().TypeMap())

fields := rows.FieldDescriptions()
columns := make([]string, len(fields))
oids := make([]uint32, len(fields))
for i, field := range fields {
columns[i] = string(field.Name)
oids[i] = field.DataTypeOID
}

var resultRows [][]any
for rows.Next() {
values, err := rows.Values()
if err != nil {
return nil, fmt.Errorf("scan row values: %w", err)
}
decodedValues, err := pg.DecodeFromPG(values, oids, oidToDataType)
if err != nil {
return nil, fmt.Errorf("decode values: %w", err)
}
resultRows = append(resultRows, decodedValues)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration: %w", err)
}

return &sql.ResultSet{
Columns: columns,
Rows: resultRows,
Status: sql.CommandTag{
Text: rows.CommandTag().String(),
RowsAffected: rows.CommandTag().RowsAffected(),
},
}, nil
}

// BeginTx implements sql.TxMaker for nested transactions (savepoints).
func (t *poolTxWrapper) BeginTx(ctx context.Context) (sql.Tx, error) {
nestedTx, err := t.tx.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin nested transaction: %w", err)
}
return &poolTxWrapper{tx: nestedTx}, nil
}

// Rollback implements sql.Tx.
func (t *poolTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx)
}

// Commit implements sql.Tx.
func (t *poolTxWrapper) Commit(ctx context.Context) error {
return t.tx.Commit(ctx)
}
101 changes: 101 additions & 0 deletions extensions/tn_local/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package tn_local

import (
"context"
"fmt"

"github.com/trufnetwork/kwil-db/node/types/sql"
)

// setupLocalSchema creates the ext_tn_local schema and all tables.
// Tables mirror the normalized consensus schema (post migration 017).
func setupLocalSchema(ctx context.Context, tx sql.Tx) error {
if _, err := tx.Execute(ctx, `CREATE SCHEMA IF NOT EXISTS `+SchemaName); err != nil {
return fmt.Errorf("create schema: %w", err)
}

if err := ensureStreamsTable(ctx, tx); err != nil {
return err
}

if err := ensurePrimitiveEventsTable(ctx, tx); err != nil {
return err
}

if err := ensureTaxonomiesTable(ctx, tx); err != nil {
return err
}

return nil
}

// ensureStreamsTable creates the streams table.
// Mirrors: consensus streams table after 017-normalize-tables.sql
func ensureStreamsTable(ctx context.Context, tx sql.Tx) error {
createTable := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.streams (
id SERIAL PRIMARY KEY,
data_provider TEXT NOT NULL,
stream_id TEXT NOT NULL,
stream_type TEXT NOT NULL,
created_at INT8 NOT NULL,
UNIQUE(data_provider, stream_id),
CHECK (stream_type IN ('primitive', 'composed'))
)`, SchemaName)

if _, err := tx.Execute(ctx, createTable); err != nil {
return fmt.Errorf("create streams table: %w", err)
}
return nil
}

// ensurePrimitiveEventsTable creates the primitive_events table with an index.
// Mirrors: consensus primitive_events after 017-normalize-tables.sql
func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error {
createTable := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.primitive_events (
stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE,
event_time INT8 NOT NULL,
value NUMERIC(36,18) NOT NULL,
created_at INT8 NOT NULL DEFAULT 0,
PRIMARY KEY (stream_ref, event_time, created_at)
)`, SchemaName, SchemaName)

if _, err := tx.Execute(ctx, createTable); err != nil {
return fmt.Errorf("create primitive_events table: %w", err)
}

createIndex := fmt.Sprintf(`
CREATE INDEX IF NOT EXISTS local_pe_stream_time_idx
ON %s.primitive_events (stream_ref, event_time)`, SchemaName)

if _, err := tx.Execute(ctx, createIndex); err != nil {
return fmt.Errorf("create primitive_events index: %w", err)
}
return nil
}

// ensureTaxonomiesTable creates the taxonomies table.
// Mirrors: consensus taxonomies after 017-normalize-tables.sql
// Local composed streams can ONLY reference other local streams.
func ensureTaxonomiesTable(ctx context.Context, tx sql.Tx) error {
createTable := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.taxonomies (
taxonomy_id UUID PRIMARY KEY,
stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE,
child_stream_ref INT NOT NULL REFERENCES %s.streams(id) ON DELETE CASCADE,
weight NUMERIC(36,18) NOT NULL,
start_time INT8 NOT NULL,
group_sequence INT NOT NULL DEFAULT 0,
created_at INT8 NOT NULL,
disabled_at INT8,
CHECK (weight >= 0),
CHECK (group_sequence >= 0),
CHECK (start_time >= 0)
)`, SchemaName, SchemaName, SchemaName)

if _, err := tx.Execute(ctx, createTable); err != nil {
return fmt.Errorf("create taxonomies table: %w", err)
}
return nil
}
Loading
Loading