From 61137d2bcd2da350f5ddafc0213799e154b02224 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 18:11:30 +0700 Subject: [PATCH 1/4] feat: store local stream records --- extensions/tn_local/create_stream_test.go | 194 ++++++++++++++++++ extensions/tn_local/db_ops.go | 46 +++++ extensions/tn_local/handlers.go | 40 +++- extensions/tn_local/insert_records_test.go | 216 +++++++++++++++++++++ extensions/tn_local/tn_local_test.go | 183 ----------------- 5 files changed, 494 insertions(+), 185 deletions(-) create mode 100644 extensions/tn_local/create_stream_test.go create mode 100644 extensions/tn_local/insert_records_test.go diff --git a/extensions/tn_local/create_stream_test.go b/extensions/tn_local/create_stream_test.go new file mode 100644 index 00000000..58336407 --- /dev/null +++ b/extensions/tn_local/create_stream_test.go @@ -0,0 +1,194 @@ +package tn_local + +import ( + "context" + "fmt" + "testing" + + "github.com/jackc/pgx/v5/pgconn" + "github.com/stretchr/testify/require" + jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" + kwilsql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/tests/utils" +) + +func TestCreateStream_NilRequest(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + resp, rpcErr := ext.CreateStream(context.Background(), nil) + require.Nil(t, resp) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "missing request") +} + +func TestCreateStream_Success(t *testing.T) { + var capturedStmt string + var capturedArgs []any + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + capturedStmt = stmt + capturedArgs = args + return &kwilsql.ResultSet{}, nil + }, + } + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + + require.Nil(t, rpcErr, "expected no error") + require.NotNil(t, resp) + require.Contains(t, capturedStmt, "INSERT INTO "+SchemaName+".streams") + require.Len(t, capturedArgs, 4, "INSERT should have 4 parameters") + // data_provider should be lowercased (matching consensus behavior) + require.Equal(t, "0xec36224a679218ae28fcece8d3c68595b87dd832", capturedArgs[0]) + require.Equal(t, "st00000000000000000000000000test", capturedArgs[1]) + require.Equal(t, "primitive", capturedArgs[2]) + // created_at should be a non-zero unix timestamp + createdAt, ok := capturedArgs[3].(int64) + require.True(t, ok, "created_at should be int64") + require.NotZero(t, createdAt, "created_at should be non-zero") +} + +func TestCreateStream_ComposedType(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return &kwilsql.ResultSet{}, nil + }, + } + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "composed", + }) + + require.Nil(t, rpcErr) + require.NotNil(t, resp) +} + +func TestCreateStream_InvalidStreamID(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + tests := []struct { + name string + streamID string + wantMsg string + }{ + {"too short", "st00", "must be exactly 32 characters"}, + {"too long", "st000000000000000000000000000test1", "must be exactly 32 characters"}, + {"wrong prefix", "xx00000000000000000000000000test", "must start with 'st'"}, + {"empty", "", "must be exactly 32 characters"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: tt.streamID, + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, tt.wantMsg) + }) + } +} + +func TestCreateStream_InvalidStreamType(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "invalid", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "must be 'primitive' or 'composed'") +} + +func TestCreateStream_InvalidDataProvider(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + tests := []struct { + name string + dataProvider string + }{ + {"no 0x prefix", "EC36224A679218Ae28FCeCe8d3c68595B87Dd832"}, + {"too short", "0xEC36224A679218Ae28"}, + {"too long", "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832FF"}, + {"invalid chars", "0xGG36224A679218Ae28FCeCe8d3c68595B87Dd832"}, + {"empty", ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: tt.dataProvider, + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") + }) + } +} + +func TestCreateStream_DuplicateStream(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("duplicate key value violates unique constraint") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream already exists") +} + +func TestCreateStream_DuplicateStream_PgError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, &pgconn.PgError{Code: pgUniqueViolation, Message: "unique_violation"} + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream already exists") +} + +func TestCreateStream_DBError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("connection refused") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) + require.Contains(t, rpcErr.Message, "failed to create stream") +} diff --git a/extensions/tn_local/db_ops.go b/extensions/tn_local/db_ops.go index e173e330..b16508fd 100644 --- a/extensions/tn_local/db_ops.go +++ b/extensions/tn_local/db_ops.go @@ -29,6 +29,52 @@ func (ext *Extension) dbCreateStream(ctx context.Context, dataProvider, streamID return err } +// dbLookupStreamRef looks up a stream by data_provider and stream_id. +// Returns (id, stream_type, nil) if found, or (0, "", nil) if not found. +func (ext *Extension) dbLookupStreamRef(ctx context.Context, dataProvider, streamID string) (int64, string, error) { + rs, err := ext.db.Execute(ctx, fmt.Sprintf( + `SELECT id, stream_type FROM %s.streams WHERE data_provider = $1 AND stream_id = $2`, SchemaName), + dataProvider, streamID) + if err != nil { + return 0, "", err + } + if len(rs.Rows) == 0 { + return 0, "", nil + } + id, ok := rs.Rows[0][0].(int64) + if !ok { + return 0, "", fmt.Errorf("unexpected id type: %T", rs.Rows[0][0]) + } + streamType, ok := rs.Rows[0][1].(string) + if !ok { + return 0, "", fmt.Errorf("unexpected stream_type type: %T", rs.Rows[0][1]) + } + return id, streamType, nil +} + +// dbInsertRecords batch-inserts records into ext_tn_local.primitive_events within a transaction. +func (ext *Extension) dbInsertRecords(ctx context.Context, streamRef int64, records []RecordInput) error { + createdAt := time.Now().Unix() + + tx, err := ext.db.BeginTx(ctx) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + for _, r := range records { + _, err := tx.Execute(ctx, fmt.Sprintf( + `INSERT INTO %s.primitive_events (stream_ref, event_time, value, created_at) + VALUES ($1, $2, $3, $4)`, SchemaName), + streamRef, r.EventTime, r.Value, createdAt) + if err != nil { + return err + } + } + + return tx.Commit(ctx) +} + // SetupSchema creates the ext_tn_local schema and all tables within a single transaction. func (l *LocalDB) SetupSchema(ctx context.Context) error { l.logger.Info("setting up local storage schema") diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index 28c6f919..02e5b3fc 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -88,9 +88,45 @@ func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest return &CreateStreamResponse{}, nil } -// InsertRecords inserts records into a local primitive stream. (Task 4) +// InsertRecords inserts records into a local primitive stream. func (ext *Extension) InsertRecords(ctx context.Context, req *InsertRecordsRequest) (*InsertRecordsResponse, *jsonrpc.Error) { - return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "not implemented", nil) + if req == nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "missing request", nil) + } + + dataProvider := strings.ToLower(req.DataProvider) + + if err := validateDataProvider(dataProvider); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) + } + if err := validateStreamID(req.StreamID); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) + } + if len(req.Records) == 0 { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "records must not be empty", nil) + } + + streamRef, streamType, err := ext.dbLookupStreamRef(ctx, dataProvider, req.StreamID) + if err != nil { + ext.logger.Error("failed to look up stream", "error", err) + return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to look up stream", nil) + } + if streamRef == 0 { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream not found: %s/%s", dataProvider, req.StreamID), nil) + } + if streamType != "primitive" { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream %s/%s is not a primitive stream", dataProvider, req.StreamID), nil) + } + + if err := ext.dbInsertRecords(ctx, streamRef, req.Records); err != nil { + if isDuplicateKeyError(err) { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "duplicate record: same event_time already exists", nil) + } + ext.logger.Error("failed to insert records", "error", err) + return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to insert records", nil) + } + + return &InsertRecordsResponse{Count: len(req.Records)}, nil } // InsertTaxonomy adds a taxonomy entry to a local composed stream. (Task 5) diff --git a/extensions/tn_local/insert_records_test.go b/extensions/tn_local/insert_records_test.go new file mode 100644 index 00000000..6c07f904 --- /dev/null +++ b/extensions/tn_local/insert_records_test.go @@ -0,0 +1,216 @@ +package tn_local + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/jackc/pgx/v5/pgconn" + "github.com/stretchr/testify/require" + jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" + kwilsql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/tests/utils" +) + +// mockDBWithStream returns a MockDB that simulates a stream lookup returning the given +// streamRef and streamType, and captures INSERT statements via executeFn. +func mockDBWithStream(streamRef int64, streamType string, executeFn func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error)) *utils.MockDB { + return &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + if strings.Contains(stmt, "SELECT") { + if streamRef == 0 { + return &kwilsql.ResultSet{Rows: [][]any{}}, nil + } + return &kwilsql.ResultSet{ + Columns: []string{"id", "stream_type"}, + Rows: [][]any{{streamRef, streamType}}, + }, nil + } + if executeFn != nil { + return executeFn(ctx, stmt, args...) + } + return &kwilsql.ResultSet{}, nil + }, + BeginTxFn: func(ctx context.Context) (kwilsql.Tx, error) { + return &utils.MockTx{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + if executeFn != nil { + return executeFn(ctx, stmt, args...) + } + return &kwilsql.ResultSet{}, nil + }, + }, nil + }, + } +} + +func TestInsertRecords_NilRequest(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + resp, rpcErr := ext.InsertRecords(context.Background(), nil) + require.Nil(t, resp) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "missing request") +} + +func TestInsertRecords_EmptyRecords(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "records must not be empty") +} + +func TestInsertRecords_Success(t *testing.T) { + var capturedStmts []string + var capturedArgs [][]any + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + capturedStmts = append(capturedStmts, stmt) + capturedArgs = append(capturedArgs, args) + return &kwilsql.ResultSet{}, nil + }) + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{ + {EventTime: 1000, Value: "123.456"}, + {EventTime: 2000, Value: "789.012"}, + }, + }) + + require.Nil(t, rpcErr, "expected no error") + require.NotNil(t, resp) + require.Equal(t, 2, resp.Count) + + // Two INSERT statements (one per record) + require.Len(t, capturedStmts, 2) + for _, stmt := range capturedStmts { + require.Contains(t, stmt, "INSERT INTO "+SchemaName+".primitive_events") + } + + // Each INSERT has 4 args: stream_ref, event_time, value, created_at + require.Len(t, capturedArgs[0], 4) + require.Equal(t, int64(42), capturedArgs[0][0]) // stream_ref + require.Equal(t, int64(1000), capturedArgs[0][1]) + require.Equal(t, "123.456", capturedArgs[0][2]) + createdAt, ok := capturedArgs[0][3].(int64) + require.True(t, ok, "created_at should be int64") + require.NotZero(t, createdAt) + + require.Equal(t, int64(42), capturedArgs[1][0]) + require.Equal(t, int64(2000), capturedArgs[1][1]) + require.Equal(t, "789.012", capturedArgs[1][2]) +} + +func TestInsertRecords_StreamNotFound(t *testing.T) { + mockDB := mockDBWithStream(0, "", nil) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream not found") +} + +func TestInsertRecords_ComposedStreamRejected(t *testing.T) { + mockDB := mockDBWithStream(42, "composed", nil) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "is not a primitive stream") +} + +func TestInsertRecords_DuplicateRecord(t *testing.T) { + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, &pgconn.PgError{Code: pgUniqueViolation, Message: "unique_violation"} + }) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "duplicate record") +} + +func TestInsertRecords_DBError(t *testing.T) { + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("connection refused") + }) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) + require.Contains(t, rpcErr.Message, "failed to insert records") +} + +func TestInsertRecords_InvalidDataProvider(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "invalid", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") +} + +func TestInsertRecords_InvalidStreamID(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "bad", + Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "stream_id must be exactly 32 characters") +} + +func TestInsertRecords_LookupDBError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("connection refused") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) + require.Contains(t, rpcErr.Message, "failed to look up stream") +} diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index ffd5eb4e..2f784afd 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -2,13 +2,11 @@ package tn_local import ( "context" - "fmt" "io" "strings" "sync" "testing" - "github.com/jackc/pgx/v5/pgconn" "github.com/stretchr/testify/require" "github.com/trufnetwork/kwil-db/core/log" jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" @@ -146,187 +144,6 @@ func newTestExtension(db kwilsql.DB) *Extension { return ext } -func TestCreateStream_NilRequest(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - resp, rpcErr := ext.CreateStream(context.Background(), nil) - require.Nil(t, resp) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "missing request") -} - -func TestCreateStream_Success(t *testing.T) { - var capturedStmt string - var capturedArgs []any - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - capturedStmt = stmt - capturedArgs = args - return &kwilsql.ResultSet{}, nil - }, - } - ext := newTestExtension(mockDB) - - resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - - require.Nil(t, rpcErr, "expected no error") - require.NotNil(t, resp) - require.Contains(t, capturedStmt, "INSERT INTO "+SchemaName+".streams") - require.Len(t, capturedArgs, 4, "INSERT should have 4 parameters") - // data_provider should be lowercased (matching consensus behavior) - require.Equal(t, "0xec36224a679218ae28fcece8d3c68595b87dd832", capturedArgs[0]) - require.Equal(t, "st00000000000000000000000000test", capturedArgs[1]) - require.Equal(t, "primitive", capturedArgs[2]) - // created_at should be a non-zero unix timestamp - createdAt, ok := capturedArgs[3].(int64) - require.True(t, ok, "created_at should be int64") - require.NotZero(t, createdAt, "created_at should be non-zero") -} - -func TestCreateStream_ComposedType(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return &kwilsql.ResultSet{}, nil - }, - } - ext := newTestExtension(mockDB) - - resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "composed", - }) - - require.Nil(t, rpcErr) - require.NotNil(t, resp) -} - -func TestCreateStream_InvalidStreamID(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - tests := []struct { - name string - streamID string - wantMsg string - }{ - {"too short", "st00", "must be exactly 32 characters"}, - {"too long", "st000000000000000000000000000test1", "must be exactly 32 characters"}, - {"wrong prefix", "xx00000000000000000000000000test", "must start with 'st'"}, - {"empty", "", "must be exactly 32 characters"}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: tt.streamID, - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, tt.wantMsg) - }) - } -} - -func TestCreateStream_InvalidStreamType(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "invalid", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "must be 'primitive' or 'composed'") -} - -func TestCreateStream_InvalidDataProvider(t *testing.T) { - ext := newTestExtension(&utils.MockDB{}) - - tests := []struct { - name string - dataProvider string - }{ - {"no 0x prefix", "EC36224A679218Ae28FCeCe8d3c68595B87Dd832"}, - {"too short", "0xEC36224A679218Ae28"}, - {"too long", "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832FF"}, - {"invalid chars", "0xGG36224A679218Ae28FCeCe8d3c68595B87Dd832"}, - {"empty", ""}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: tt.dataProvider, - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") - }) - } -} - -func TestCreateStream_DuplicateStream(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, fmt.Errorf("duplicate key value violates unique constraint") - }, - } - ext := newTestExtension(mockDB) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "stream already exists") -} - -func TestCreateStream_DuplicateStream_PgError(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, &pgconn.PgError{Code: pgUniqueViolation, Message: "unique_violation"} - }, - } - ext := newTestExtension(mockDB) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "stream already exists") -} - -func TestCreateStream_DBError(t *testing.T) { - mockDB := &utils.MockDB{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, fmt.Errorf("connection refused") - }, - } - ext := newTestExtension(mockDB) - - _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - StreamType: "primitive", - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) - require.Contains(t, rpcErr.Message, "failed to create stream") -} - func containsSQL(statements []string, substr string) bool { for _, s := range statements { if strings.Contains(s, substr) { From 093a7a8cf192a779e20416c77ed90c70e0449013 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 19:01:40 +0700 Subject: [PATCH 2/4] chore: apply suggestion --- extensions/tn_local/db_ops.go | 3 +- extensions/tn_local/handlers.go | 12 ++++++ extensions/tn_local/insert_records_test.go | 46 ++++++++++++++++++++++ extensions/tn_local/schema.go | 2 +- 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/extensions/tn_local/db_ops.go b/extensions/tn_local/db_ops.go index b16508fd..c363da26 100644 --- a/extensions/tn_local/db_ops.go +++ b/extensions/tn_local/db_ops.go @@ -65,7 +65,8 @@ func (ext *Extension) dbInsertRecords(ctx context.Context, streamRef int64, reco for _, r := range records { _, err := tx.Execute(ctx, fmt.Sprintf( `INSERT INTO %s.primitive_events (stream_ref, event_time, value, created_at) - VALUES ($1, $2, $3, $4)`, SchemaName), + VALUES ($1, $2, $3, $4) + ON CONFLICT (stream_ref, event_time) DO NOTHING`, SchemaName), streamRef, r.EventTime, r.Value, createdAt) if err != nil { return err diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index 02e5b3fc..bc70a9bf 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "math" "regexp" + "strconv" "strings" "github.com/jackc/pgx/v5/pgconn" @@ -118,6 +120,16 @@ func (ext *Extension) InsertRecords(ctx context.Context, req *InsertRecordsReque return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream %s/%s is not a primitive stream", dataProvider, req.StreamID), nil) } + for i, r := range req.Records { + f, parseErr := strconv.ParseFloat(r.Value, 64) + if parseErr != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("invalid record value at index %d: %v", i, parseErr), nil) + } + if math.IsNaN(f) || math.IsInf(f, 0) { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("invalid record value at index %d: must be a finite number", i), nil) + } + } + if err := ext.dbInsertRecords(ctx, streamRef, req.Records); err != nil { if isDuplicateKeyError(err) { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "duplicate record: same event_time already exists", nil) diff --git a/extensions/tn_local/insert_records_test.go b/extensions/tn_local/insert_records_test.go index 6c07f904..210955e0 100644 --- a/extensions/tn_local/insert_records_test.go +++ b/extensions/tn_local/insert_records_test.go @@ -197,6 +197,52 @@ func TestInsertRecords_InvalidStreamID(t *testing.T) { require.Contains(t, rpcErr.Message, "stream_id must be exactly 32 characters") } +func TestInsertRecords_InvalidValue(t *testing.T) { + mockDB := mockDBWithStream(42, "primitive", nil) + ext := newTestExtension(mockDB) + + tests := []struct { + name string + value string + wantMsg string + }{ + {"non-numeric", "hello", "invalid record value at index 0"}, + {"empty", "", "invalid record value at index 0"}, + {"NaN", "NaN", "must be a finite number"}, + {"Inf", "Inf", "must be a finite number"}, + {"-Inf", "-Inf", "must be a finite number"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{{EventTime: 1000, Value: tt.value}}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, tt.wantMsg) + }) + } +} + +func TestInsertRecords_InvalidValueAtIndex(t *testing.T) { + mockDB := mockDBWithStream(42, "primitive", nil) + ext := newTestExtension(mockDB) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + Records: []RecordInput{ + {EventTime: 1000, Value: "1.0"}, + {EventTime: 2000, Value: "bad"}, + }, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "invalid record value at index 1") +} + func TestInsertRecords_LookupDBError(t *testing.T) { mockDB := &utils.MockDB{ ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { diff --git a/extensions/tn_local/schema.go b/extensions/tn_local/schema.go index d043a277..8c84ef30 100644 --- a/extensions/tn_local/schema.go +++ b/extensions/tn_local/schema.go @@ -66,7 +66,7 @@ func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { } createIndex := fmt.Sprintf(` - CREATE INDEX IF NOT EXISTS local_pe_stream_time_idx + CREATE UNIQUE INDEX IF NOT EXISTS local_pe_stream_time_idx ON %s.primitive_events (stream_ref, event_time)`, SchemaName) if _, err := tx.Execute(ctx, createIndex); err != nil { From 38c5513bc133e193d702115d6cfccf8c3460a591 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 19:37:11 +0700 Subject: [PATCH 3/4] chore: apply suggestion --- extensions/tn_local/db_ops.go | 3 +- extensions/tn_local/schema.go | 52 +++++++++++++++++++++ extensions/tn_local/tn_local_test.go | 68 ++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 2 deletions(-) diff --git a/extensions/tn_local/db_ops.go b/extensions/tn_local/db_ops.go index c363da26..b16508fd 100644 --- a/extensions/tn_local/db_ops.go +++ b/extensions/tn_local/db_ops.go @@ -65,8 +65,7 @@ func (ext *Extension) dbInsertRecords(ctx context.Context, streamRef int64, reco for _, r := range records { _, err := tx.Execute(ctx, fmt.Sprintf( `INSERT INTO %s.primitive_events (stream_ref, event_time, value, created_at) - VALUES ($1, $2, $3, $4) - ON CONFLICT (stream_ref, event_time) DO NOTHING`, SchemaName), + VALUES ($1, $2, $3, $4)`, SchemaName), streamRef, r.EventTime, r.Value, createdAt) if err != nil { return err diff --git a/extensions/tn_local/schema.go b/extensions/tn_local/schema.go index 8c84ef30..caf69fb2 100644 --- a/extensions/tn_local/schema.go +++ b/extensions/tn_local/schema.go @@ -65,6 +65,11 @@ func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { return fmt.Errorf("create primitive_events table: %w", err) } + // Migrate existing non-unique index to unique if needed. + if err := migrateToUniqueEventIndex(ctx, tx); err != nil { + return err + } + createIndex := fmt.Sprintf(` CREATE UNIQUE INDEX IF NOT EXISTS local_pe_stream_time_idx ON %s.primitive_events (stream_ref, event_time)`, SchemaName) @@ -75,6 +80,53 @@ func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { return nil } +// migrateToUniqueEventIndex detects a pre-existing non-unique local_pe_stream_time_idx, +// deduplicates rows that would violate uniqueness on (stream_ref, event_time), +// drops the old index, and lets the caller recreate it as UNIQUE. +func migrateToUniqueEventIndex(ctx context.Context, tx sql.Tx) error { + // Step 1: check if the index exists and is non-unique. + rs, err := tx.Execute(ctx, ` + SELECT NOT i.indisunique + FROM pg_index i + JOIN pg_class c ON c.oid = i.indexrelid + WHERE c.relname = 'local_pe_stream_time_idx'`) + if err != nil { + return fmt.Errorf("check index uniqueness: %w", err) + } + + // Index does not exist yet (fresh install) — nothing to migrate. + if len(rs.Rows) == 0 { + return nil + } + + isNonUnique, ok := rs.Rows[0][0].(bool) + if !ok || !isNonUnique { + // Already unique — nothing to do. + return nil + } + + // Step 2: deduplicate — keep the row with the latest created_at per (stream_ref, event_time). + if _, err := tx.Execute(ctx, fmt.Sprintf(` + DELETE FROM %s.primitive_events pe1 + WHERE EXISTS ( + SELECT 1 FROM %s.primitive_events pe2 + WHERE pe2.stream_ref = pe1.stream_ref + AND pe2.event_time = pe1.event_time + AND pe2.created_at > pe1.created_at + )`, SchemaName, SchemaName)); err != nil { + return fmt.Errorf("deduplicate primitive_events: %w", err) + } + + // Step 3: drop the non-unique index. + if _, err := tx.Execute(ctx, fmt.Sprintf( + `DROP INDEX IF EXISTS %s.local_pe_stream_time_idx`, SchemaName)); err != nil { + return fmt.Errorf("drop non-unique index: %w", err) + } + + // Step 4: caller recreates as CREATE UNIQUE INDEX. + return nil +} + // ensureTaxonomiesTable creates the taxonomies table. // Mirrors: consensus taxonomies after 017-normalize-tables.sql // Local composed streams can ONLY reference other local streams. diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index 2f784afd..6deef197 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -63,6 +63,74 @@ func TestSetupSchema(t *testing.T) { "taxonomies should have UUID primary key") } +func TestMigrateToUniqueEventIndex_NonUnique(t *testing.T) { + var statements []string + mockTx := &utils.MockTx{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + statements = append(statements, stmt) + // Simulate pg_index query returning non-unique (true) + if strings.Contains(stmt, "pg_index") { + return &kwilsql.ResultSet{ + Columns: []string{"?column?"}, + Rows: [][]any{{true}}, + }, nil + } + return &kwilsql.ResultSet{}, nil + }, + } + + err := migrateToUniqueEventIndex(context.Background(), mockTx) + require.NoError(t, err) + + // Should have run: pg_index check, DELETE dedup, DROP INDEX + require.True(t, containsSQL(statements, "pg_index"), "should check pg_index") + require.True(t, containsSQL(statements, "DELETE FROM"), "should deduplicate rows") + require.True(t, containsSQL(statements, "DROP INDEX"), "should drop non-unique index") +} + +func TestMigrateToUniqueEventIndex_AlreadyUnique(t *testing.T) { + var statements []string + mockTx := &utils.MockTx{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + statements = append(statements, stmt) + // Simulate pg_index query returning already unique (false) + if strings.Contains(stmt, "pg_index") { + return &kwilsql.ResultSet{ + Columns: []string{"?column?"}, + Rows: [][]any{{false}}, + }, nil + } + return &kwilsql.ResultSet{}, nil + }, + } + + err := migrateToUniqueEventIndex(context.Background(), mockTx) + require.NoError(t, err) + + // Should only have the pg_index check, no dedup or drop + require.Len(t, statements, 1, "should only check pg_index, no further action") +} + +func TestMigrateToUniqueEventIndex_FreshInstall(t *testing.T) { + var statements []string + mockTx := &utils.MockTx{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + statements = append(statements, stmt) + // Simulate pg_index query returning no rows (index doesn't exist) + if strings.Contains(stmt, "pg_index") { + return &kwilsql.ResultSet{Rows: [][]any{}}, nil + } + return &kwilsql.ResultSet{}, nil + }, + } + + err := migrateToUniqueEventIndex(context.Background(), mockTx) + require.NoError(t, err) + + // Should only have the pg_index check, no dedup or drop + require.Len(t, statements, 1, "should only check pg_index, no further action") +} + func TestSetupSchema_RollbackOnError(t *testing.T) { rolledBack := false mockTx := &utils.MockTx{ From 847cea0bc51c2409c7b66863b89d3b65ac8cbe49 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 20:17:16 +0700 Subject: [PATCH 4/4] chore: mirror consensus pattern --- extensions/tn_local/db_ops.go | 9 +- extensions/tn_local/handlers.go | 93 ++++++++---- extensions/tn_local/insert_records_test.go | 159 ++++++++++++++------- extensions/tn_local/schema.go | 56 +------- extensions/tn_local/tn_local_test.go | 68 --------- extensions/tn_local/types.go | 20 ++- 6 files changed, 185 insertions(+), 220 deletions(-) diff --git a/extensions/tn_local/db_ops.go b/extensions/tn_local/db_ops.go index b16508fd..1fae2ae0 100644 --- a/extensions/tn_local/db_ops.go +++ b/extensions/tn_local/db_ops.go @@ -52,8 +52,9 @@ func (ext *Extension) dbLookupStreamRef(ctx context.Context, dataProvider, strea return id, streamType, nil } -// dbInsertRecords batch-inserts records into ext_tn_local.primitive_events within a transaction. -func (ext *Extension) dbInsertRecords(ctx context.Context, streamRef int64, records []RecordInput) error { +// dbInsertRecords batch-inserts resolved records into ext_tn_local.primitive_events +// within a transaction. Mirrors the consensus INSERT in 003-primitive-insertion.sql. +func (ext *Extension) dbInsertRecords(ctx context.Context, streamRefs []int64, eventTimes []int64, values []string) error { createdAt := time.Now().Unix() tx, err := ext.db.BeginTx(ctx) @@ -62,11 +63,11 @@ func (ext *Extension) dbInsertRecords(ctx context.Context, streamRef int64, reco } defer func() { _ = tx.Rollback(ctx) }() - for _, r := range records { + for i := range streamRefs { _, err := tx.Execute(ctx, fmt.Sprintf( `INSERT INTO %s.primitive_events (stream_ref, event_time, value, created_at) VALUES ($1, $2, $3, $4)`, SchemaName), - streamRef, r.EventTime, r.Value, createdAt) + streamRefs[i], eventTimes[i], values[i], createdAt) if err != nil { return err } diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index bc70a9bf..57130623 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -90,38 +90,39 @@ func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest return &CreateStreamResponse{}, nil } -// InsertRecords inserts records into a local primitive stream. +// InsertRecords inserts records into local primitive streams. +// Mirrors the consensus insert_records action (003-primitive-insertion.sql): +// - Parallel arrays: data_provider[], stream_id[], event_time[], value[] +// - Zero values are silently filtered (WHERE value != 0) +// - Multiple rows per (stream_ref, event_time) allowed (created_at versioning) +// - Returns empty response (consensus returns nothing) func (ext *Extension) InsertRecords(ctx context.Context, req *InsertRecordsRequest) (*InsertRecordsResponse, *jsonrpc.Error) { if req == nil { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "missing request", nil) } - dataProvider := strings.ToLower(req.DataProvider) - - if err := validateDataProvider(dataProvider); err != nil { - return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) - } - if err := validateStreamID(req.StreamID); err != nil { - return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) - } - if len(req.Records) == 0 { + n := len(req.DataProvider) + if n == 0 { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "records must not be empty", nil) } - - streamRef, streamType, err := ext.dbLookupStreamRef(ctx, dataProvider, req.StreamID) - if err != nil { - ext.logger.Error("failed to look up stream", "error", err) - return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to look up stream", nil) - } - if streamRef == 0 { - return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream not found: %s/%s", dataProvider, req.StreamID), nil) + if n != len(req.StreamID) || n != len(req.EventTime) || n != len(req.Value) { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "array lengths mismatch", nil) } - if streamType != "primitive" { - return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream %s/%s is not a primitive stream", dataProvider, req.StreamID), nil) + + // Normalize data_providers to lowercase (consensus uses LOWER() in 001-common-actions.sql). + for i := range req.DataProvider { + req.DataProvider[i] = strings.ToLower(req.DataProvider[i]) } - for i, r := range req.Records { - f, parseErr := strconv.ParseFloat(r.Value, 64) + // Validate all inputs upfront. + for i := 0; i < n; i++ { + if err := validateDataProvider(req.DataProvider[i]); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("record %d: %v", i, err), nil) + } + if err := validateStreamID(req.StreamID[i]); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("record %d: %v", i, err), nil) + } + f, parseErr := strconv.ParseFloat(req.Value[i], 64) if parseErr != nil { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("invalid record value at index %d: %v", i, parseErr), nil) } @@ -130,15 +131,51 @@ func (ext *Extension) InsertRecords(ctx context.Context, req *InsertRecordsReque } } - if err := ext.dbInsertRecords(ctx, streamRef, req.Records); err != nil { - if isDuplicateKeyError(err) { - return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "duplicate record: same event_time already exists", nil) + // Resolve stream refs for unique (data_provider, stream_id) pairs. + type streamKey struct{ dp, sid string } + streamRefMap := make(map[streamKey]int64) + for i := 0; i < n; i++ { + key := streamKey{req.DataProvider[i], req.StreamID[i]} + if _, ok := streamRefMap[key]; ok { + continue + } + ref, stype, err := ext.dbLookupStreamRef(ctx, key.dp, key.sid) + if err != nil { + ext.logger.Error("failed to look up stream", "error", err) + return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to look up stream", nil) + } + if ref == 0 { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream not found: %s/%s", key.dp, key.sid), nil) + } + if stype != "primitive" { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream %s/%s is not a primitive stream", key.dp, key.sid), nil) + } + streamRefMap[key] = ref + } + + // Build resolved records, filtering zero values (mirrors consensus WHERE value != 0). + streamRefs := make([]int64, 0, n) + eventTimes := make([]int64, 0, n) + values := make([]string, 0, n) + for i := 0; i < n; i++ { + f, _ := strconv.ParseFloat(req.Value[i], 64) + if f == 0 { + continue + } + key := streamKey{req.DataProvider[i], req.StreamID[i]} + streamRefs = append(streamRefs, streamRefMap[key]) + eventTimes = append(eventTimes, req.EventTime[i]) + values = append(values, req.Value[i]) + } + + if len(streamRefs) > 0 { + if err := ext.dbInsertRecords(ctx, streamRefs, eventTimes, values); err != nil { + ext.logger.Error("failed to insert records", "error", err) + return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to insert records", nil) } - ext.logger.Error("failed to insert records", "error", err) - return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to insert records", nil) } - return &InsertRecordsResponse{Count: len(req.Records)}, nil + return &InsertRecordsResponse{}, nil } // InsertTaxonomy adds a taxonomy entry to a local composed stream. (Task 5) diff --git a/extensions/tn_local/insert_records_test.go b/extensions/tn_local/insert_records_test.go index 210955e0..78c2de09 100644 --- a/extensions/tn_local/insert_records_test.go +++ b/extensions/tn_local/insert_records_test.go @@ -6,7 +6,6 @@ import ( "strings" "testing" - "github.com/jackc/pgx/v5/pgconn" "github.com/stretchr/testify/require" jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" kwilsql "github.com/trufnetwork/kwil-db/node/types/sql" @@ -45,6 +44,9 @@ func mockDBWithStream(streamRef int64, streamType string, executeFn func(ctx con } } +const testDP = "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832" +const testSID = "st00000000000000000000000000test" + func TestInsertRecords_NilRequest(t *testing.T) { ext := newTestExtension(&utils.MockDB{}) @@ -55,19 +57,34 @@ func TestInsertRecords_NilRequest(t *testing.T) { require.Contains(t, rpcErr.Message, "missing request") } -func TestInsertRecords_EmptyRecords(t *testing.T) { +func TestInsertRecords_EmptyArrays(t *testing.T) { ext := newTestExtension(&utils.MockDB{}) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{}, + DataProvider: []string{}, + StreamID: []string{}, + EventTime: []int64{}, + Value: []string{}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, "records must not be empty") } +func TestInsertRecords_ArrayLengthMismatch(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP, testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) + require.Contains(t, rpcErr.Message, "array lengths mismatch") +} + func TestInsertRecords_Success(t *testing.T) { var capturedStmts []string var capturedArgs [][]any @@ -79,17 +96,14 @@ func TestInsertRecords_Success(t *testing.T) { ext := newTestExtension(mockDB) resp, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{ - {EventTime: 1000, Value: "123.456"}, - {EventTime: 2000, Value: "789.012"}, - }, + DataProvider: []string{testDP, testDP}, + StreamID: []string{testSID, testSID}, + EventTime: []int64{1000, 2000}, + Value: []string{"123.456", "789.012"}, }) require.Nil(t, rpcErr, "expected no error") require.NotNil(t, resp) - require.Equal(t, 2, resp.Count) // Two INSERT statements (one per record) require.Len(t, capturedStmts, 2) @@ -111,14 +125,61 @@ func TestInsertRecords_Success(t *testing.T) { require.Equal(t, "789.012", capturedArgs[1][2]) } +func TestInsertRecords_ZeroValuesFiltered(t *testing.T) { + var capturedArgs [][]any + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + capturedArgs = append(capturedArgs, args) + return &kwilsql.ResultSet{}, nil + }) + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP, testDP, testDP}, + StreamID: []string{testSID, testSID, testSID}, + EventTime: []int64{1000, 2000, 3000}, + Value: []string{"1.5", "0", "2.5"}, + }) + + require.Nil(t, rpcErr) + require.NotNil(t, resp) + + // Only 2 inserts — the "0" record is filtered like consensus + require.Len(t, capturedArgs, 2) + require.Equal(t, int64(1000), capturedArgs[0][1]) + require.Equal(t, "1.5", capturedArgs[0][2]) + require.Equal(t, int64(3000), capturedArgs[1][1]) + require.Equal(t, "2.5", capturedArgs[1][2]) +} + +func TestInsertRecords_AllZerosNoInsert(t *testing.T) { + insertCalled := false + mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + insertCalled = true + return &kwilsql.ResultSet{}, nil + }) + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"0"}, + }) + + require.Nil(t, rpcErr) + require.NotNil(t, resp) + require.False(t, insertCalled, "should not call dbInsertRecords when all values are zero") +} + func TestInsertRecords_StreamNotFound(t *testing.T) { mockDB := mockDBWithStream(0, "", nil) ext := newTestExtension(mockDB) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) @@ -130,31 +191,16 @@ func TestInsertRecords_ComposedStreamRejected(t *testing.T) { ext := newTestExtension(mockDB) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, "is not a primitive stream") } -func TestInsertRecords_DuplicateRecord(t *testing.T) { - mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, &pgconn.PgError{Code: pgUniqueViolation, Message: "unique_violation"} - }) - ext := newTestExtension(mockDB) - - _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, - }) - require.NotNil(t, rpcErr) - require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) - require.Contains(t, rpcErr.Message, "duplicate record") -} - func TestInsertRecords_DBError(t *testing.T) { mockDB := mockDBWithStream(42, "primitive", func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { return nil, fmt.Errorf("connection refused") @@ -162,9 +208,10 @@ func TestInsertRecords_DBError(t *testing.T) { ext := newTestExtension(mockDB) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) @@ -175,9 +222,10 @@ func TestInsertRecords_InvalidDataProvider(t *testing.T) { ext := newTestExtension(&utils.MockDB{}) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "invalid", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + DataProvider: []string{"invalid"}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) @@ -188,9 +236,10 @@ func TestInsertRecords_InvalidStreamID(t *testing.T) { ext := newTestExtension(&utils.MockDB{}) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "bad", - Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + DataProvider: []string{testDP}, + StreamID: []string{"bad"}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) @@ -215,9 +264,10 @@ func TestInsertRecords_InvalidValue(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{{EventTime: 1000, Value: tt.value}}, + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{tt.value}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) @@ -231,12 +281,10 @@ func TestInsertRecords_InvalidValueAtIndex(t *testing.T) { ext := newTestExtension(mockDB) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{ - {EventTime: 1000, Value: "1.0"}, - {EventTime: 2000, Value: "bad"}, - }, + DataProvider: []string{testDP, testDP}, + StreamID: []string{testSID, testSID}, + EventTime: []int64{1000, 2000}, + Value: []string{"1.0", "bad"}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) @@ -252,9 +300,10 @@ func TestInsertRecords_LookupDBError(t *testing.T) { ext := newTestExtension(mockDB) _, rpcErr := ext.InsertRecords(context.Background(), &InsertRecordsRequest{ - DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", - StreamID: "st00000000000000000000000000test", - Records: []RecordInput{{EventTime: 1000, Value: "1.0"}}, + DataProvider: []string{testDP}, + StreamID: []string{testSID}, + EventTime: []int64{1000}, + Value: []string{"1.0"}, }) require.NotNil(t, rpcErr) require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) diff --git a/extensions/tn_local/schema.go b/extensions/tn_local/schema.go index caf69fb2..03d27ae9 100644 --- a/extensions/tn_local/schema.go +++ b/extensions/tn_local/schema.go @@ -65,13 +65,10 @@ func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { return fmt.Errorf("create primitive_events table: %w", err) } - // Migrate existing non-unique index to unique if needed. - if err := migrateToUniqueEventIndex(ctx, tx); err != nil { - return err - } - + // Non-unique index mirrors consensus primitive_events_query_idx (017-normalize-tables.sql:105). + // Multiple rows per (stream_ref, event_time) are allowed — created_at provides versioning. createIndex := fmt.Sprintf(` - CREATE UNIQUE INDEX IF NOT EXISTS local_pe_stream_time_idx + CREATE INDEX IF NOT EXISTS local_pe_stream_time_idx ON %s.primitive_events (stream_ref, event_time)`, SchemaName) if _, err := tx.Execute(ctx, createIndex); err != nil { @@ -80,53 +77,6 @@ func ensurePrimitiveEventsTable(ctx context.Context, tx sql.Tx) error { return nil } -// migrateToUniqueEventIndex detects a pre-existing non-unique local_pe_stream_time_idx, -// deduplicates rows that would violate uniqueness on (stream_ref, event_time), -// drops the old index, and lets the caller recreate it as UNIQUE. -func migrateToUniqueEventIndex(ctx context.Context, tx sql.Tx) error { - // Step 1: check if the index exists and is non-unique. - rs, err := tx.Execute(ctx, ` - SELECT NOT i.indisunique - FROM pg_index i - JOIN pg_class c ON c.oid = i.indexrelid - WHERE c.relname = 'local_pe_stream_time_idx'`) - if err != nil { - return fmt.Errorf("check index uniqueness: %w", err) - } - - // Index does not exist yet (fresh install) — nothing to migrate. - if len(rs.Rows) == 0 { - return nil - } - - isNonUnique, ok := rs.Rows[0][0].(bool) - if !ok || !isNonUnique { - // Already unique — nothing to do. - return nil - } - - // Step 2: deduplicate — keep the row with the latest created_at per (stream_ref, event_time). - if _, err := tx.Execute(ctx, fmt.Sprintf(` - DELETE FROM %s.primitive_events pe1 - WHERE EXISTS ( - SELECT 1 FROM %s.primitive_events pe2 - WHERE pe2.stream_ref = pe1.stream_ref - AND pe2.event_time = pe1.event_time - AND pe2.created_at > pe1.created_at - )`, SchemaName, SchemaName)); err != nil { - return fmt.Errorf("deduplicate primitive_events: %w", err) - } - - // Step 3: drop the non-unique index. - if _, err := tx.Execute(ctx, fmt.Sprintf( - `DROP INDEX IF EXISTS %s.local_pe_stream_time_idx`, SchemaName)); err != nil { - return fmt.Errorf("drop non-unique index: %w", err) - } - - // Step 4: caller recreates as CREATE UNIQUE INDEX. - return nil -} - // ensureTaxonomiesTable creates the taxonomies table. // Mirrors: consensus taxonomies after 017-normalize-tables.sql // Local composed streams can ONLY reference other local streams. diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index 6deef197..2f784afd 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -63,74 +63,6 @@ func TestSetupSchema(t *testing.T) { "taxonomies should have UUID primary key") } -func TestMigrateToUniqueEventIndex_NonUnique(t *testing.T) { - var statements []string - mockTx := &utils.MockTx{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - statements = append(statements, stmt) - // Simulate pg_index query returning non-unique (true) - if strings.Contains(stmt, "pg_index") { - return &kwilsql.ResultSet{ - Columns: []string{"?column?"}, - Rows: [][]any{{true}}, - }, nil - } - return &kwilsql.ResultSet{}, nil - }, - } - - err := migrateToUniqueEventIndex(context.Background(), mockTx) - require.NoError(t, err) - - // Should have run: pg_index check, DELETE dedup, DROP INDEX - require.True(t, containsSQL(statements, "pg_index"), "should check pg_index") - require.True(t, containsSQL(statements, "DELETE FROM"), "should deduplicate rows") - require.True(t, containsSQL(statements, "DROP INDEX"), "should drop non-unique index") -} - -func TestMigrateToUniqueEventIndex_AlreadyUnique(t *testing.T) { - var statements []string - mockTx := &utils.MockTx{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - statements = append(statements, stmt) - // Simulate pg_index query returning already unique (false) - if strings.Contains(stmt, "pg_index") { - return &kwilsql.ResultSet{ - Columns: []string{"?column?"}, - Rows: [][]any{{false}}, - }, nil - } - return &kwilsql.ResultSet{}, nil - }, - } - - err := migrateToUniqueEventIndex(context.Background(), mockTx) - require.NoError(t, err) - - // Should only have the pg_index check, no dedup or drop - require.Len(t, statements, 1, "should only check pg_index, no further action") -} - -func TestMigrateToUniqueEventIndex_FreshInstall(t *testing.T) { - var statements []string - mockTx := &utils.MockTx{ - ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - statements = append(statements, stmt) - // Simulate pg_index query returning no rows (index doesn't exist) - if strings.Contains(stmt, "pg_index") { - return &kwilsql.ResultSet{Rows: [][]any{}}, nil - } - return &kwilsql.ResultSet{}, nil - }, - } - - err := migrateToUniqueEventIndex(context.Background(), mockTx) - require.NoError(t, err) - - // Should only have the pg_index check, no dedup or drop - require.Len(t, statements, 1, "should only check pg_index, no further action") -} - func TestSetupSchema_RollbackOnError(t *testing.T) { rolledBack := false mockTx := &utils.MockTx{ diff --git a/extensions/tn_local/types.go b/extensions/tn_local/types.go index 6d10be55..d0695510 100644 --- a/extensions/tn_local/types.go +++ b/extensions/tn_local/types.go @@ -11,22 +11,18 @@ type CreateStreamRequest struct { type CreateStreamResponse struct{} // InsertRecordsRequest is the JSON-RPC request for local.insert_records. +// Mirrors the consensus insert_records($data_provider TEXT[], $stream_id TEXT[], +// $event_time INT8[], $value NUMERIC(36,18)[]) signature — parallel arrays. type InsertRecordsRequest struct { - DataProvider string `json:"data_provider"` - StreamID string `json:"stream_id"` - Records []RecordInput `json:"records"` -} - -// RecordInput is a single record to insert. -type RecordInput struct { - EventTime int64 `json:"event_time"` - Value string `json:"value"` + DataProvider []string `json:"data_provider"` + StreamID []string `json:"stream_id"` + EventTime []int64 `json:"event_time"` + Value []string `json:"value"` } // InsertRecordsResponse is the JSON-RPC response for local.insert_records. -type InsertRecordsResponse struct { - Count int `json:"count"` -} +// Mirrors consensus which returns nothing. +type InsertRecordsResponse struct{} // InsertTaxonomyRequest is the JSON-RPC request for local.insert_taxonomy. type InsertTaxonomyRequest struct {