Skip to content

feat: insert local stream records#1342

Merged
MicBun merged 4 commits intomainfrom
insertRecordLocal
Mar 18, 2026
Merged

feat: insert local stream records#1342
MicBun merged 4 commits intomainfrom
insertRecordLocal

Conversation

@MicBun
Copy link
Member

@MicBun MicBun commented Mar 18, 2026

resolves: https://github.com/truflation/website/issues/3478

Summary by CodeRabbit

  • New Features

    • InsertRecords RPC implemented: accepts parallel-array inputs, validates and normalizes provider/stream IDs, filters zero values, and returns an empty success response.
  • Bug Fixes

    • Clarified schema behavior: event rows may retain multiple versions (non-unique time index) to support versioning.
  • Tests

    • Extensive test coverage added for record insertion and stream-creation/error paths, including input validation and DB error handling.

@MicBun MicBun requested a review from pr-time-tracker March 18, 2026 11:11
@MicBun MicBun self-assigned this Mar 18, 2026
@MicBun MicBun added the type: feat New feature or request label Mar 18, 2026
@holdex
Copy link

holdex bot commented Mar 18, 2026

Time Submission Status

Member Status Time Action Last Update
MicBun ✅ Submitted 4h Update time Mar 18, 2026, 1:27 PM

You can submit time with the command. Example:

@holdex pr submit-time 15m

See available commands to help comply with our Guidelines.

@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

📝 Walkthrough

Walkthrough

Adds a new InsertRecords RPC implementation with validation and transactional DB inserts, two DB helper methods (dbLookupStreamRef, dbInsertRecords), comprehensive tests for InsertRecords and CreateStream, removes older CreateStream tests from tn_local_test.go, and updates schema comments related to primitive_events indexing. (≤50 words)

Changes

Cohort / File(s) Summary
InsertRecords handler & helpers
extensions/tn_local/handlers.go, extensions/tn_local/db_ops.go
Implements InsertRecords RPC with input validation, lowercasing, stream lookup, value parsing, and transactional batch inserts; adds dbLookupStreamRef and dbInsertRecords helper methods.
InsertRecords tests
extensions/tn_local/insert_records_test.go
Adds extensive unit tests and mock DB helpers covering nil/invalid params, array length mismatch, success path, zero-value filtering, stream lookups, stream-type enforcement, and DB error paths.
CreateStream tests (new file)
extensions/tn_local/create_stream_test.go
Adds comprehensive CreateStream test suite validating nil request, success, composed-type, invalid IDs/types/providers, duplicate-key handling, and DB error mapping.
Test cleanup
extensions/tn_local/tn_local_test.go
Removes prior CreateStream tests and related imports; retains other tn_local tests and helpers.
Types & API surface
extensions/tn_local/types.go
Changes InsertRecordsRequest to parallel arrays (DataProvider[], StreamID[], EventTime[], Value[]) and makes InsertRecordsResponse an empty struct; removes RecordInput type.
Schema comments
extensions/tn_local/schema.go
Adds comments noting non-unique local_pe_stream_time_idx behavior (no functional change).

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Handler as "InsertRecords Handler"
    participant DB as "DB (queries)"
    participant Tx as "DB Transaction"

    Client->>Handler: InsertRecords(request)
    Handler->>Handler: Validate inputs, lowercase data_provider, parse values
    Handler->>DB: dbLookupStreamRef(data_provider, stream_id)
    DB-->>Handler: (streamRef, streamType) or error
    Handler->>Handler: Ensure streamType == "primitive"
    Handler->>DB: Begin transaction
    DB-->>Tx: tx started
    Handler->>Tx: dbInsertRecords(streamRefs,eventTimes,values)
    Tx-->>Handler: success or error
    Handler->>Tx: Commit or Rollback
    Handler-->>Client: InsertRecordsResponse or Error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • pr-time-tracker

Poem

🐇 I hop through columns, timestamps in tow,
I lowercase providers where the test cases grow,
I stitch stream refs into rows with care,
Commit on success, rollback on a scare —
A rabbit's tiny patch: tidy, fast, and fair.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: insert local stream records' accurately summarizes the main change: implementing functionality to insert records into local streams via the new InsertRecords RPC handler.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch insertRecordLocal
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@extensions/tn_local/db_ops.go`:
- Around line 56-69: The insert currently sets created_at per call in
dbInsertRecords, but created_at is part of the primary key so different
created_at values allow duplicate (stream_ref, event_time) rows; change the
INSERT into SchemaName.primitive_events inside dbInsertRecords to enforce
one-row-per (stream_ref,event_time) by adding an ON CONFLICT clause targeting
(stream_ref, event_time) — e.g. "INSERT ... VALUES (...) ON CONFLICT
(stream_ref, event_time) DO NOTHING" (or DO UPDATE to merge fields if you
prefer) so duplicate event_time inserts are ignored/merged; keep references to
tx.Execute, SchemaName, primitive_events, streamRef, r.EventTime, r.Value and
createdAt when updating the SQL string.

In `@extensions/tn_local/handlers.go`:
- Around line 105-127: The handler currently only checks that req.Records is
non-empty and then calls ext.dbInsertRecords, which lets invalid numeric strings
in records[].value bubble up as DB errors and become ErrorInternal; add
per-record validation before calling ext.dbInsertRecords: iterate over
req.Records (use the existing req.Records slice), attempt to parse/convert each
record's value to the numeric type expected by the DB (e.g., parse float64/int
as appropriate), and if any parse fails return
jsonrpc.NewError(jsonrpc.ErrorInvalidParams, fmt.Sprintf("invalid record value
at index %d: %v", i, err), nil); keep using isDuplicateKeyError to handle
duplicate-key DB errors, and only call ext.dbInsertRecords after all records
validate successfully. Ensure you reference streamRef/streamType from the
earlier ext.dbLookupStreamRef checks and validate before invoking
ext.dbInsertRecords.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 506acd1b-04f8-4345-8fea-0c96ea5fb445

📥 Commits

Reviewing files that changed from the base of the PR and between f8e1dc1 and 61137d2.

📒 Files selected for processing (5)
  • extensions/tn_local/create_stream_test.go
  • extensions/tn_local/db_ops.go
  • extensions/tn_local/handlers.go
  • extensions/tn_local/insert_records_test.go
  • extensions/tn_local/tn_local_test.go
💤 Files with no reviewable changes (1)
  • extensions/tn_local/tn_local_test.go

@MicBun MicBun changed the title feat: store local stream records feat: insert local stream records Mar 18, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@extensions/tn_local/db_ops.go`:
- Around line 66-77: The INSERT currently uses "ON CONFLICT (stream_ref,
event_time) DO NOTHING" which suppresses unique-constraint errors and causes the
duplicate-as-error contract to fail; remove the "ON CONFLICT ... DO NOTHING"
clause from the INSERT into SchemaName.primitive_events so duplicate key
violations propagate as errors, allowing the existing duplicate-key handling
logic (the error check after tx.Execute and the later duplicate-count logic that
inspects the returned error) to run and return the true inserted count instead
of silently ignoring duplicates; keep the same parameter list (streamRef,
r.EventTime, r.Value, createdAt) and transaction/commit flow unchanged.

In `@extensions/tn_local/schema.go`:
- Around line 69-70: Existing deployments may still have a non-unique index
local_pe_stream_time_idx on %s.primitive_events while the write path in
db_ops.go uses ON CONFLICT (stream_ref, event_time); add a migration that (1)
detects whether local_pe_stream_time_idx exists and is non-unique, (2) if
non-unique, deduplicates rows violating uniqueness on (stream_ref, event_time)
(or fail with a clear report), (3) drops local_pe_stream_time_idx and (4)
recreates it with CREATE UNIQUE INDEX local_pe_stream_time_idx ON
%s.primitive_events (stream_ref, event_time) using SchemaName; ensure the
migration runs before any code that performs ON CONFLICT writes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4df5b97a-1d06-426f-a295-847f1acbb804

📥 Commits

Reviewing files that changed from the base of the PR and between 61137d2 and 093a7a8.

📒 Files selected for processing (4)
  • extensions/tn_local/db_ops.go
  • extensions/tn_local/handlers.go
  • extensions/tn_local/insert_records_test.go
  • extensions/tn_local/schema.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@extensions/tn_local/schema.go`:
- Around line 88-92: The query against pg_index/pg_class should also filter by
schema to avoid collisions; update the SQL in the tx.Execute call that looks for
'local_pe_stream_time_idx' to join pg_namespace (e.g. JOIN pg_namespace n ON
c.relnamespace = n.oid) and add a condition like AND n.nspname = $1 (or the
appropriate schema name literal/variable) so you locate the index in the
intended namespace; pass the schema name as a parameter from the surrounding
code (use the existing ctx/tx call signature to supply it).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dd9f2d70-5c4f-4bf2-b901-16f44c6f770b

📥 Commits

Reviewing files that changed from the base of the PR and between 093a7a8 and 38c5513.

📒 Files selected for processing (3)
  • extensions/tn_local/db_ops.go
  • extensions/tn_local/schema.go
  • extensions/tn_local/tn_local_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • extensions/tn_local/db_ops.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
extensions/tn_local/handlers.go (1)

125-130: ⚠️ Potential issue | 🟠 Major

ParseFloat still leaves NUMERIC(36,18) gaps.

This fixes plain non-numeric strings, but it still validates against float64 rather than the DB type you actually write to. Values that PostgreSQL would reject or round to zero at scale 18 can still pass here, so they either fail later as ErrorInternal or bypass the zero-value filter. Please validate once against the exact decimal contract and reuse that result for filtering.

Also applies to: 161-163

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@extensions/tn_local/handlers.go` around lines 125 - 130, The current
validation parses req.Value[i] with strconv.ParseFloat into f and checks
math.IsNaN/IsInf, but that accepts values that won't fit PostgreSQL
NUMERIC(36,18); change validation to parse req.Value[i] into a fixed‑precision
decimal (e.g., using a decimal library or big.Int scaled by 1e18) and verify it
fits the NUMERIC(36,18) range and scale constraints, then reuse that parsed
decimal result for subsequent zero-value filtering and DB writes instead of the
float64 f so rounding/overflow/magnitude issues are prevented (update the code
paths around strconv.ParseFloat, f, math.IsNaN/math.IsInf and any later zero
checks to use the validated decimal value).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@extensions/tn_local/handlers.go`:
- Around line 134-154: Filter out rows whose value is the documented no-op
(zero) before performing stream lookups: when iterating the request arrays to
build streamRefMap (using streamKey, req.DataProvider, req.StreamID and calling
ext.dbLookupStreamRef), skip any index i where the corresponding value field
indicates a zero/no-op (e.g., req.Value[i] == "0" or numeric zero according to
the request schema) so you do not call ext.dbLookupStreamRef or validate stype
for those rows; apply the same pre-filtering change to the second similar loop
referenced (the block around the later check at lines ~160-164) so zero-valued
rows are consistently ignored before stream resolution and primitive-stream
checks.

---

Duplicate comments:
In `@extensions/tn_local/handlers.go`:
- Around line 125-130: The current validation parses req.Value[i] with
strconv.ParseFloat into f and checks math.IsNaN/IsInf, but that accepts values
that won't fit PostgreSQL NUMERIC(36,18); change validation to parse
req.Value[i] into a fixed‑precision decimal (e.g., using a decimal library or
big.Int scaled by 1e18) and verify it fits the NUMERIC(36,18) range and scale
constraints, then reuse that parsed decimal result for subsequent zero-value
filtering and DB writes instead of the float64 f so rounding/overflow/magnitude
issues are prevented (update the code paths around strconv.ParseFloat, f,
math.IsNaN/math.IsInf and any later zero checks to use the validated decimal
value).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dca15c5b-7844-4f40-823f-3b61b2f3cde2

📥 Commits

Reviewing files that changed from the base of the PR and between 38c5513 and 847cea0.

📒 Files selected for processing (6)
  • extensions/tn_local/db_ops.go
  • extensions/tn_local/handlers.go
  • extensions/tn_local/insert_records_test.go
  • extensions/tn_local/schema.go
  • extensions/tn_local/tn_local_test.go
  • extensions/tn_local/types.go
💤 Files with no reviewable changes (1)
  • extensions/tn_local/tn_local_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • extensions/tn_local/schema.go
  • extensions/tn_local/db_ops.go

@MicBun MicBun merged commit 8a7235b into main Mar 18, 2026
8 checks passed
@MicBun MicBun deleted the insertRecordLocal branch March 18, 2026 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type: feat New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant