Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 123 additions & 61 deletions backend/postgres/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package postgres

import (
"context"
"encoding/json"
"strconv"
"strings"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -36,75 +38,52 @@ func (b *Backend) Execute(ctx context.Context, plan *ir.Plan, rc *reqctx.Context
}
}

// executeRead compiles and runs the windowed read. The entire request is sent as
// a single pgx.Batch: [BEGIN, session setup, count (if needed), query, ROLLBACK].
// One network write to PostgreSQL covers all round trips, matching PostgREST's
// hasql pipeline behaviour. Rows stream from within the open batch; Close drains
// the trailing ROLLBACK item and releases the connection.
// executeRead compiles and runs the windowed read in a read-only transaction.
// Session setup (SET LOCAL ROLE, search_path, GUCs) is applied via applySession
// before the main query is sent so the PostgreSQL planner sees the correct role
// at parse time. Rows stream from within the open transaction; Close commits it.
//
// Note: a single-batch approach (BEGIN + session + query + ROLLBACK in one
// pipeline) would let pgx pre-parse the main SELECT while the connection is still
// authenticator (NOINHERIT, no schema USAGE), causing a 42501 error. applySession
// completes its batch before the main query is issued, so Parse runs as the
// request role, which has the required privileges.
func (b *Backend) executeRead(ctx context.Context, plan *ir.Plan, rc *reqctx.Context) (backend.Result, error) {
conn, err := b.pool.Acquire(ctx)
tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
if err != nil {
return nil, b.MapError(err)
}
release := func() { conn.Release() }
rollback := func() { _ = tx.Rollback(ctx) }

// Build the single batch: BEGIN → session → [count] → query → ROLLBACK.
batch := &pgx.Batch{}
batch.Queue("BEGIN TRANSACTION READ ONLY")
sessionN := queueSessionItems(batch, b, rc)
if err := applySession(ctx, tx, b, rc); err != nil {
rollback()
return nil, b.MapError(err)
}

res := &streamResult{ctx: ctx, tx: tx, controls: rc.Controls()}

hasCount := plan.Query.Count != ir.CountNone
var cst *sqlgen.Statement
if hasCount {
var apiErr *pgerr.APIError
cst, apiErr = sqlgen.CompileCount(Dialect{}, plan.Query)
if plan.Query.Count != ir.CountNone {
cst, apiErr := sqlgen.CompileCount(Dialect{}, plan.Query)
if apiErr != nil {
release()
rollback()
return nil, apiErr
}
batch.Queue(cst.SQL, cst.Args...)
if err := tx.QueryRow(ctx, cst.SQL, cst.Args...).Scan(&res.count); err != nil {
rollback()
return nil, b.MapError(err)
}
res.hasCount = true
}

st, apiErr := sqlgen.CompileRead(Dialect{}, plan.Query)
if apiErr != nil {
release()
rollback()
return nil, apiErr
}
batch.Queue(st.SQL, st.Args...)
batch.Queue("ROLLBACK")

br := conn.SendBatch(ctx, batch)

abort := func(e error) (backend.Result, error) {
_ = br.Close()
release()
return nil, e
}

// Drain BEGIN.
if _, err := br.Exec(); err != nil {
return abort(b.MapError(err))
}
// Drain session setup items.
for range sessionN {
if _, err := br.Exec(); err != nil {
return abort(b.MapError(err))
}
}

res := &batchStreamResult{ctx: ctx, conn: conn, br: br, controls: rc.Controls()}

if hasCount {
_ = cst // already queued
if err := br.QueryRow().Scan(&res.count); err != nil {
return abort(b.MapError(err))
}
res.hasCount = true
}

rows, err := br.Query()
rows, err := tx.Query(ctx, st.SQL, st.Args...)
if err != nil {
return abort(b.MapError(err))
rollback()
return nil, b.MapError(err)
}
res.rows = rows
res.cols = fieldNames(rows)
Expand Down Expand Up @@ -324,10 +303,11 @@ func (b *Backend) executeCallRead(ctx context.Context, plan *ir.Plan, rc *reqctx

// compileNativeCall generates the PostgreSQL function-call SQL for the native
// RPC path (NativeRPC=true), where there is no declared function registry. It
// renders SELECT * FROM schema.fn(arg := $1, ...) using the search path's first
// schema as the function schema. Arguments come from the call's parsed arg map;
// they are bound as named parameters (fn_name := $N) which is how PostgREST
// calls PG functions. When no args are supplied the call has an empty arg list.
// renders SELECT * FROM schema.fn(arg := <literal>, ...) with values embedded
// as SQL literals so PostgreSQL infers the parameter types from the function
// signature and the call does not depend on pgx OID mapping. String values are
// single-quote escaped; numeric JSON values are written as numeric literals;
// booleans become TRUE/FALSE; null or absent values become NULL.
func (b *Backend) compileNativeCall(c *ir.Call) (*sqlgen.Statement, *pgerr.APIError) {
schema := "public"
if len(b.searchPath) > 0 {
Expand All @@ -342,20 +322,58 @@ func (b *Backend) compileNativeCall(c *ir.Call) (*sqlgen.Statement, *pgerr.APIEr
sb.WriteString(d.QuoteIdent(c.Function.Name))
sb.WriteString("(")

args := make([]any, 0, len(c.Args))
i := 0
for name, val := range c.Args {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString(d.QuoteIdent(name))
sb.WriteString(" := ")
sb.WriteString(d.Placeholder(i + 1))
args = append(args, val.Text)
appendNativeArg(&sb, val)
i++
}
sb.WriteString(")")
return &sqlgen.Statement{SQL: sb.String(), Args: args}, nil
return &sqlgen.Statement{SQL: sb.String()}, nil
}

// appendNativeArg writes one function argument as a safe SQL literal. Numbers
// are written unquoted so PostgreSQL resolves their type from context; strings
// use single-quote escaping; booleans are TRUE/FALSE; anything else (including
// absent values) becomes NULL. Objects and arrays are JSON-quoted.
func appendNativeArg(sb *strings.Builder, val ir.Value) {
if val.JSON != nil {
switch v := val.JSON.(type) {
case string:
sb.WriteString("'")
sb.WriteString(strings.ReplaceAll(v, "'", "''"))
sb.WriteString("'")
case json.Number:
// json.Number from dec.UseNumber() — write as-is; it is a valid SQL numeric literal.
sb.WriteString(v.String())
case float64:
sb.WriteString(strconv.FormatFloat(v, 'f', -1, 64))
case bool:
if v {
sb.WriteString("TRUE")
} else {
sb.WriteString("FALSE")
}
default:
// JSON object / array: pass as json literal.
enc, _ := json.Marshal(v)
sb.WriteString("'")
sb.WriteString(strings.ReplaceAll(string(enc), "'", "''"))
sb.WriteString("'::json")
}
return
}
if val.Text != "" {
sb.WriteString("'")
sb.WriteString(strings.ReplaceAll(val.Text, "'", "''"))
sb.WriteString("'")
return
}
sb.WriteString("NULL")
}

// compileWrite dispatches to the right compiler for the mutation kind.
Expand Down Expand Up @@ -407,6 +425,50 @@ func fieldNames(rows pgx.Rows) []string {
return names
}

// ExplainRead runs EXPLAIN (FORMAT JSON) on the read query and returns the raw
// JSON plan from PostgreSQL. When analyze is true EXPLAIN ANALYZE is used
// instead, which also executes the query and includes timing. The request runs
// in a read-only transaction with the full session setup (role + GUCs) so the
// planner sees the same context as a real request.
func (b *Backend) ExplainRead(ctx context.Context, p *ir.Plan, rc *reqctx.Context, analyze bool) ([]byte, error) {
tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
if err != nil {
return nil, b.MapError(err)
}
defer func() { _ = tx.Rollback(ctx) }()

if err := applySession(ctx, tx, b, rc); err != nil {
return nil, b.MapError(err)
}

st, apiErr := sqlgen.CompileRead(Dialect{}, p.Query)
if apiErr != nil {
return nil, apiErr
}

var prefix string
if analyze {
prefix = "EXPLAIN (ANALYZE, FORMAT JSON) "
} else {
prefix = "EXPLAIN (FORMAT JSON) "
}
rows, err := tx.Query(ctx, prefix+st.SQL, st.Args...)
if err != nil {
return nil, b.MapError(err)
}
defer rows.Close()
var plan []byte
for rows.Next() {
if err := rows.Scan(&plan); err != nil {
return nil, b.MapError(err)
}
}
if err := rows.Err(); err != nil {
return nil, b.MapError(err)
}
return plan, nil
}

// drainRows reads every row of a pgx cursor into memory, normalizing values so
// json/jsonb, bytea, and date columns render correctly. The rows are closed by
// drainRows; the caller must not close them again.
Expand Down
4 changes: 2 additions & 2 deletions backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func statusForSQLState(code string) int {
return 404
case "42P01": // undefined_table
return 404
case "42501": // insufficient_privilege
return 403
case "42501": // insufficient_privilege → 401 matching PostgREST
return 401
case "42P17": // infinite_recursion
return 500
}
Expand Down
2 changes: 1 addition & 1 deletion backend/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestStatusForSQLState(t *testing.T) {
{"25006", 405},
{"42883", 404},
{"42P01", 404},
{"42501", 403},
{"42501", 401}, // matches PostgREST: insufficient_privilege → 401
// PTxxx convention
{"PT403", 403},
{"PT201", 201},
Expand Down
59 changes: 0 additions & 59 deletions backend/postgres/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/tamnd/dbrest/backend"
"github.com/tamnd/dbrest/reqctx"
Expand Down Expand Up @@ -77,64 +76,6 @@ func (s *streamRows) Close() error {
return s.tx.Commit(s.ctx)
}

// batchStreamResult adapts an in-flight pgx.BatchResults to the backend.Result
// contract for a read. The entire request (BEGIN + session setup + query +
// ROLLBACK) was sent in one pgx.Batch network write; the caller has already
// consumed the non-row items and positioned br at the query result. Streaming
// rows through the open BatchResults and draining ROLLBACK at Close reduces the
// read path to a single PostgreSQL round trip.
type batchStreamResult struct {
ctx context.Context
conn *pgxpool.Conn
br pgx.BatchResults
rows pgx.Rows
cols []string
controls *reqctx.ResponseControls
count int64
hasCount bool
}

func (r *batchStreamResult) Body() io.Reader { return nil }
func (r *batchStreamResult) Rows() backend.RowStream {
return &batchStreamRows{ctx: r.ctx, conn: r.conn, br: r.br, rows: r.rows, cols: r.cols}
}
func (r *batchStreamResult) Count() (int64, bool) { return r.count, r.hasCount }
func (r *batchStreamResult) Affected() (int64, bool) { return 0, false }
func (r *batchStreamResult) ResponseControls() *reqctx.ResponseControls { return r.controls }

// batchStreamRows streams rows from within an open pgx.BatchResults. On Close
// it drains the remaining ROLLBACK item, closes the batch, and releases the
// connection back to the pool.
type batchStreamRows struct {
ctx context.Context
conn *pgxpool.Conn
br pgx.BatchResults
rows pgx.Rows
cols []string
}

func (s *batchStreamRows) Columns() []string { return s.cols }
func (s *batchStreamRows) Next() bool { return s.rows.Next() }
func (s *batchStreamRows) Err() error { return s.rows.Err() }

func (s *batchStreamRows) Values() ([]any, error) {
vals, err := s.rows.Values()
if err != nil {
return nil, err
}
return normalizeValues(vals, s.rows.FieldDescriptions()), nil
}

// Close drains the ROLLBACK batch item and releases the connection.
func (s *batchStreamRows) Close() error {
s.rows.Close()
rowErr := s.rows.Err()
s.br.Exec() //nolint:errcheck // ROLLBACK; ignore error, it's cleanup
_ = s.br.Close()
s.conn.Release()
return rowErr
}

// bufResult holds the buffered outcome of a write or a function call. A write
// runs inside a transaction that must commit (or roll back, under tx=rollback)
// before the response is sent, and a function call's response headers and status
Expand Down
10 changes: 10 additions & 0 deletions backend/spi.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ type Result interface {
ResponseControls() *reqctx.ResponseControls
}

// Explainer is an optional backend capability for the vnd.pgrst.plan+json
// Accept type. Backends that support EXPLAIN implement this interface;
// the frontend type-asserts to it and falls back to 406 when absent.
type Explainer interface {
// ExplainRead runs EXPLAIN on the read query and returns raw JSON from the
// engine's query planner. If analyze is true the engine also executes and
// times the query (EXPLAIN ANALYZE equivalent).
ExplainRead(ctx context.Context, p *ir.Plan, rc *reqctx.Context, analyze bool) ([]byte, error)
}

// RowStream is a forward-only cursor over result rows. The renderer drives it to
// assemble the response body when the backend does not assemble JSON itself.
type RowStream interface {
Expand Down
Loading
Loading