From 3072ce01e3a68fab0cebdcdbd1d5fa50bf10a480 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 14:45:16 +0700 Subject: [PATCH 1/5] feat: create local streams --- extensions/tn_local/db_ops.go | 10 ++ extensions/tn_local/handlers.go | 54 +++++++++- extensions/tn_local/tn_local_test.go | 154 +++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 3 deletions(-) diff --git a/extensions/tn_local/db_ops.go b/extensions/tn_local/db_ops.go index 90455061..e173e330 100644 --- a/extensions/tn_local/db_ops.go +++ b/extensions/tn_local/db_ops.go @@ -3,6 +3,7 @@ package tn_local import ( "context" "fmt" + "time" "github.com/trufnetwork/kwil-db/core/log" "github.com/trufnetwork/kwil-db/node/types/sql" @@ -19,6 +20,15 @@ func NewLocalDB(db sql.DB, logger log.Logger) *LocalDB { return &LocalDB{db: db, logger: logger} } +// dbCreateStream inserts a new stream into ext_tn_local.streams. +func (ext *Extension) dbCreateStream(ctx context.Context, dataProvider, streamID, streamType string) error { + _, err := ext.db.Execute(ctx, fmt.Sprintf( + `INSERT INTO %s.streams (data_provider, stream_id, stream_type, created_at) + VALUES ($1, $2, $3, $4)`, SchemaName), + dataProvider, streamID, streamType, time.Now().Unix()) + return err +} + // SetupSchema creates the ext_tn_local schema and all tables within a single transaction. func (l *LocalDB) SetupSchema(ctx context.Context) error { l.logger.Info("setting up local storage schema") diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index aafd1e5f..b4de305a 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -2,15 +2,63 @@ package tn_local import ( "context" + "fmt" + "regexp" + "strings" jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" ) -// Handler stubs — implementations will be added in Tasks 3-6. +var ethAddrRegex = regexp.MustCompile(`^0x[0-9a-fA-F]{40}$`) -// CreateStream creates a local stream. (Task 3) +// validateStreamID checks that stream_id is 32 chars and starts with "st". +func validateStreamID(streamID string) error { + if len(streamID) != 32 { + return fmt.Errorf("stream_id must be exactly 32 characters, got %d", len(streamID)) + } + if !strings.HasPrefix(streamID, "st") { + return fmt.Errorf("stream_id must start with 'st'") + } + return nil +} + +// validateStreamType checks that stream_type is "primitive" or "composed". +func validateStreamType(streamType string) error { + if streamType != "primitive" && streamType != "composed" { + return fmt.Errorf("stream_type must be 'primitive' or 'composed', got %q", streamType) + } + return nil +} + +// validateDataProvider checks that data_provider is a valid Ethereum address (0x + 40 hex). +func validateDataProvider(dataProvider string) error { + if !ethAddrRegex.MatchString(dataProvider) { + return fmt.Errorf("data_provider must be a valid Ethereum address (0x + 40 hex chars)") + } + return nil +} + +// CreateStream creates a local stream. func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest) (*CreateStreamResponse, *jsonrpc.Error) { - return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "not implemented", nil) + if err := validateDataProvider(req.DataProvider); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) + } + if err := validateStreamID(req.StreamID); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) + } + if err := validateStreamType(req.StreamType); err != nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) + } + + if err := ext.dbCreateStream(ctx, req.DataProvider, req.StreamID, req.StreamType); err != nil { + if strings.Contains(err.Error(), "duplicate key") || strings.Contains(err.Error(), "unique constraint") { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream already exists: %s/%s", req.DataProvider, req.StreamID), nil) + } + ext.logger.Error("failed to create local stream", "error", err) + return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to create stream", nil) + } + + return &CreateStreamResponse{}, nil } // InsertRecords inserts records into a local primitive stream. (Task 4) diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index 6b6795ea..f3f0320b 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -2,6 +2,7 @@ package tn_local import ( "context" + "fmt" "io" "strings" "sync" @@ -134,6 +135,159 @@ func TestServiceInterface(t *testing.T) { require.NotNil(t, health) } +// newTestExtension creates an Extension with a mock DB for handler tests. +func newTestExtension(db kwilsql.DB) *Extension { + ext := &Extension{ + logger: log.New(log.WithWriter(io.Discard)), + db: db, + } + ext.isEnabled.Store(true) + return ext +} + +func TestCreateStream_Success(t *testing.T) { + var capturedStmt string + var capturedArgs []any + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + capturedStmt = stmt + capturedArgs = args + return &kwilsql.ResultSet{}, nil + }, + } + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + + require.Nil(t, rpcErr, "expected no error") + require.NotNil(t, resp) + require.Contains(t, capturedStmt, "INSERT INTO "+SchemaName+".streams") + require.Equal(t, "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", capturedArgs[0]) + require.Equal(t, "st00000000000000000000000000test", capturedArgs[1]) + require.Equal(t, "primitive", capturedArgs[2]) +} + +func TestCreateStream_ComposedType(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return &kwilsql.ResultSet{}, nil + }, + } + ext := newTestExtension(mockDB) + + resp, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "composed", + }) + + require.Nil(t, rpcErr) + require.NotNil(t, resp) +} + +func TestCreateStream_InvalidStreamID(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + tests := []struct { + name string + streamID string + wantMsg string + }{ + {"too short", "st00", "must be exactly 32 characters"}, + {"too long", "st000000000000000000000000000test1", "must be exactly 32 characters"}, + {"wrong prefix", "xx00000000000000000000000000test", "must start with 'st'"}, + {"empty", "", "must be exactly 32 characters"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: tt.streamID, + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Contains(t, rpcErr.Message, tt.wantMsg) + }) + } +} + +func TestCreateStream_InvalidStreamType(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "invalid", + }) + require.NotNil(t, rpcErr) + require.Contains(t, rpcErr.Message, "must be 'primitive' or 'composed'") +} + +func TestCreateStream_InvalidDataProvider(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + tests := []struct { + name string + dataProvider string + }{ + {"no 0x prefix", "EC36224A679218Ae28FCeCe8d3c68595B87Dd832"}, + {"too short", "0xEC36224A679218Ae28"}, + {"too long", "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832FF"}, + {"invalid chars", "0xGG36224A679218Ae28FCeCe8d3c68595B87Dd832"}, + {"empty", ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: tt.dataProvider, + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") + }) + } +} + +func TestCreateStream_DuplicateStream(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("duplicate key value violates unique constraint") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Contains(t, rpcErr.Message, "stream already exists") +} + +func TestCreateStream_DBError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, fmt.Errorf("connection refused") + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInternal), rpcErr.Code) + require.Contains(t, rpcErr.Message, "failed to create stream") +} + func containsSQL(statements []string, substr string) bool { for _, s := range statements { if strings.Contains(s, substr) { From 4b25cbccbdc6de682dc8bc2f1f4f43fa2cb330a4 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 15:07:46 +0700 Subject: [PATCH 2/5] chore: apply suggestion --- extensions/tn_local/handlers.go | 21 ++++++++++++++++++++- extensions/tn_local/tn_local_test.go | 9 +++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index b4de305a..bb2f73b7 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -2,13 +2,18 @@ package tn_local import ( "context" + "errors" "fmt" "regexp" "strings" + "github.com/jackc/pgx/v5/pgconn" jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" ) +// pgUniqueViolation is the PostgreSQL error code for unique_violation. +const pgUniqueViolation = "23505" + var ethAddrRegex = regexp.MustCompile(`^0x[0-9a-fA-F]{40}$`) // validateStreamID checks that stream_id is 32 chars and starts with "st". @@ -38,8 +43,22 @@ func validateDataProvider(dataProvider string) error { return nil } +// isDuplicateKeyError checks if err is a PostgreSQL unique constraint violation (23505). +func isDuplicateKeyError(err error) bool { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + return pgErr.Code == pgUniqueViolation + } + // Fallback for non-pgx drivers (e.g. mocks): case-insensitive string match. + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "duplicate key") || strings.Contains(msg, "unique constraint") +} + // CreateStream creates a local stream. func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest) (*CreateStreamResponse, *jsonrpc.Error) { + if req == nil { + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "missing request", nil) + } if err := validateDataProvider(req.DataProvider); err != nil { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) } @@ -51,7 +70,7 @@ func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest } if err := ext.dbCreateStream(ctx, req.DataProvider, req.StreamID, req.StreamType); err != nil { - if strings.Contains(err.Error(), "duplicate key") || strings.Contains(err.Error(), "unique constraint") { + if isDuplicateKeyError(err) { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream already exists: %s/%s", req.DataProvider, req.StreamID), nil) } ext.logger.Error("failed to create local stream", "error", err) diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index f3f0320b..fd305eee 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -145,6 +145,15 @@ func newTestExtension(db kwilsql.DB) *Extension { return ext } +func TestCreateStream_NilRequest(t *testing.T) { + ext := newTestExtension(&utils.MockDB{}) + + resp, rpcErr := ext.CreateStream(context.Background(), nil) + require.Nil(t, resp) + require.NotNil(t, rpcErr) + require.Contains(t, rpcErr.Message, "missing request") +} + func TestCreateStream_Success(t *testing.T) { var capturedStmt string var capturedArgs []any From 2e912ab54a8930f08e729ffd8dbec5eb897917f7 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 15:21:06 +0700 Subject: [PATCH 3/5] chore: apply suggestion --- extensions/tn_local/handlers.go | 11 ++++++++--- extensions/tn_local/tn_local_test.go | 21 ++++++++++++++++++++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index bb2f73b7..c13703fd 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -59,7 +59,12 @@ func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest if req == nil { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, "missing request", nil) } - if err := validateDataProvider(req.DataProvider); err != nil { + + // Normalize data_provider to lowercase to match consensus behavior + // (consensus uses LOWER() in 001-common-actions.sql before insertion). + dataProvider := strings.ToLower(req.DataProvider) + + if err := validateDataProvider(dataProvider); err != nil { return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) } if err := validateStreamID(req.StreamID); err != nil { @@ -69,9 +74,9 @@ func (ext *Extension) CreateStream(ctx context.Context, req *CreateStreamRequest return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, err.Error(), nil) } - if err := ext.dbCreateStream(ctx, req.DataProvider, req.StreamID, req.StreamType); err != nil { + if err := ext.dbCreateStream(ctx, dataProvider, req.StreamID, req.StreamType); err != nil { if isDuplicateKeyError(err) { - return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream already exists: %s/%s", req.DataProvider, req.StreamID), nil) + return nil, jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("stream already exists: %s/%s", dataProvider, req.StreamID), nil) } ext.logger.Error("failed to create local stream", "error", err) return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to create stream", nil) diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index fd305eee..e663409d 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -8,6 +8,7 @@ import ( "sync" "testing" + "github.com/jackc/pgx/v5/pgconn" "github.com/stretchr/testify/require" "github.com/trufnetwork/kwil-db/core/log" jsonrpc "github.com/trufnetwork/kwil-db/core/rpc/json" @@ -175,7 +176,8 @@ func TestCreateStream_Success(t *testing.T) { require.Nil(t, rpcErr, "expected no error") require.NotNil(t, resp) require.Contains(t, capturedStmt, "INSERT INTO "+SchemaName+".streams") - require.Equal(t, "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", capturedArgs[0]) + // data_provider should be lowercased (matching consensus behavior) + require.Equal(t, "0xec36224a679218ae28fcece8d3c68595b87dd832", capturedArgs[0]) require.Equal(t, "st00000000000000000000000000test", capturedArgs[1]) require.Equal(t, "primitive", capturedArgs[2]) } @@ -279,6 +281,23 @@ func TestCreateStream_DuplicateStream(t *testing.T) { require.Contains(t, rpcErr.Message, "stream already exists") } +func TestCreateStream_DuplicateStream_PgError(t *testing.T) { + mockDB := &utils.MockDB{ + ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { + return nil, &pgconn.PgError{Code: "23505", Message: "unique_violation"} + }, + } + ext := newTestExtension(mockDB) + + _, rpcErr := ext.CreateStream(context.Background(), &CreateStreamRequest{ + DataProvider: "0xEC36224A679218Ae28FCeCe8d3c68595B87Dd832", + StreamID: "st00000000000000000000000000test", + StreamType: "primitive", + }) + require.NotNil(t, rpcErr) + require.Contains(t, rpcErr.Message, "stream already exists") +} + func TestCreateStream_DBError(t *testing.T) { mockDB := &utils.MockDB{ ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { From e5172c411cd9afa98f7b53384d9bb17bde89fff0 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 16:25:01 +0700 Subject: [PATCH 4/5] chore: apply suggestion --- extensions/tn_local/handlers.go | 3 +++ extensions/tn_local/tn_local_test.go | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/extensions/tn_local/handlers.go b/extensions/tn_local/handlers.go index c13703fd..28c6f919 100644 --- a/extensions/tn_local/handlers.go +++ b/extensions/tn_local/handlers.go @@ -45,6 +45,9 @@ func validateDataProvider(dataProvider string) error { // isDuplicateKeyError checks if err is a PostgreSQL unique constraint violation (23505). func isDuplicateKeyError(err error) bool { + if err == nil { + return false + } var pgErr *pgconn.PgError if errors.As(err, &pgErr) { return pgErr.Code == pgUniqueViolation diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index e663409d..dbcd8b6d 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -152,6 +152,7 @@ func TestCreateStream_NilRequest(t *testing.T) { resp, rpcErr := ext.CreateStream(context.Background(), nil) require.Nil(t, resp) require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, "missing request") } @@ -221,6 +222,7 @@ func TestCreateStream_InvalidStreamID(t *testing.T) { StreamType: "primitive", }) require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, tt.wantMsg) }) } @@ -235,6 +237,7 @@ func TestCreateStream_InvalidStreamType(t *testing.T) { StreamType: "invalid", }) require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, "must be 'primitive' or 'composed'") } @@ -259,6 +262,7 @@ func TestCreateStream_InvalidDataProvider(t *testing.T) { StreamType: "primitive", }) require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, "data_provider must be a valid Ethereum address") }) } @@ -284,7 +288,7 @@ func TestCreateStream_DuplicateStream(t *testing.T) { func TestCreateStream_DuplicateStream_PgError(t *testing.T) { mockDB := &utils.MockDB{ ExecuteFn: func(ctx context.Context, stmt string, args ...any) (*kwilsql.ResultSet, error) { - return nil, &pgconn.PgError{Code: "23505", Message: "unique_violation"} + return nil, &pgconn.PgError{Code: pgUniqueViolation, Message: "unique_violation"} }, } ext := newTestExtension(mockDB) From 0b3b84f4281f2019afb1fede38641f66a1001856 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 18 Mar 2026 16:31:48 +0700 Subject: [PATCH 5/5] chore: apply suggestion --- extensions/tn_local/tn_local_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/extensions/tn_local/tn_local_test.go b/extensions/tn_local/tn_local_test.go index dbcd8b6d..ffd5eb4e 100644 --- a/extensions/tn_local/tn_local_test.go +++ b/extensions/tn_local/tn_local_test.go @@ -177,10 +177,15 @@ func TestCreateStream_Success(t *testing.T) { require.Nil(t, rpcErr, "expected no error") require.NotNil(t, resp) require.Contains(t, capturedStmt, "INSERT INTO "+SchemaName+".streams") + require.Len(t, capturedArgs, 4, "INSERT should have 4 parameters") // data_provider should be lowercased (matching consensus behavior) require.Equal(t, "0xec36224a679218ae28fcece8d3c68595b87dd832", capturedArgs[0]) require.Equal(t, "st00000000000000000000000000test", capturedArgs[1]) require.Equal(t, "primitive", capturedArgs[2]) + // created_at should be a non-zero unix timestamp + createdAt, ok := capturedArgs[3].(int64) + require.True(t, ok, "created_at should be int64") + require.NotZero(t, createdAt, "created_at should be non-zero") } func TestCreateStream_ComposedType(t *testing.T) { @@ -282,6 +287,7 @@ func TestCreateStream_DuplicateStream(t *testing.T) { StreamType: "primitive", }) require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, "stream already exists") } @@ -299,6 +305,7 @@ func TestCreateStream_DuplicateStream_PgError(t *testing.T) { StreamType: "primitive", }) require.NotNil(t, rpcErr) + require.Equal(t, jsonrpc.ErrorCode(jsonrpc.ErrorInvalidParams), rpcErr.Code) require.Contains(t, rpcErr.Message, "stream already exists") }