Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/util/sql_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ import (
"github.com/pgplex/pgschema/internal/logger"
)

// execer is an interface satisfied by both *sql.DB and *sql.Conn,
// allowing ExecContextWithLogging to work with either.
type execer interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
}

// ExecContextWithLogging executes SQL with debug logging if debug mode is enabled.
// It logs the SQL statement before execution and the result/error after execution.
func ExecContextWithLogging(ctx context.Context, db *sql.DB, sqlStmt string, description string) (sql.Result, error) {
// It accepts both *sql.DB and *sql.Conn via the execer interface.
func ExecContextWithLogging(ctx context.Context, db execer, sqlStmt string, description string) (sql.Result, error) {
isDebug := logger.IsDebug()
if isDebug {
logger.Get().Debug("Executing SQL", "description", description, "sql", sqlStmt)
Expand Down
18 changes: 14 additions & 4 deletions internal/postgres/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,32 @@ func (ep *EmbeddedPostgres) GetSchemaName() string {
// This ensures a clean state before applying the desired schema definition.
// Note: The schema parameter is ignored - we always use the temporary schema name.
func (ep *EmbeddedPostgres) ApplySchema(ctx context.Context, schema string, sql string) error {
// Acquire a single dedicated connection to ensure SET search_path affects
// all subsequent statements. Using *sql.DB (connection pool) does not
// guarantee the same connection across ExecContext calls, so session-scoped
// settings like search_path may be lost.
conn, err := ep.db.Conn(ctx)
if err != nil {
return fmt.Errorf("failed to acquire connection: %w", err)
}
defer conn.Close()

// Drop the temporary schema if it exists (CASCADE to drop all objects)
dropSchemaSQL := fmt.Sprintf("DROP SCHEMA IF EXISTS \"%s\" CASCADE", ep.tempSchema)
if _, err := util.ExecContextWithLogging(ctx, ep.db, dropSchemaSQL, "drop temporary schema"); err != nil {
if _, err := util.ExecContextWithLogging(ctx, conn, dropSchemaSQL, "drop temporary schema"); err != nil {
return fmt.Errorf("failed to drop temporary schema %s: %w", ep.tempSchema, err)
}

// Create the temporary schema
createSchemaSQL := fmt.Sprintf("CREATE SCHEMA \"%s\"", ep.tempSchema)
if _, err := util.ExecContextWithLogging(ctx, ep.db, createSchemaSQL, "create temporary schema"); err != nil {
if _, err := util.ExecContextWithLogging(ctx, conn, createSchemaSQL, "create temporary schema"); err != nil {
return fmt.Errorf("failed to create temporary schema %s: %w", ep.tempSchema, err)
}

// Set search_path to the temporary schema, with public as fallback
// for resolving extension types installed in public schema (issue #197)
setSearchPathSQL := fmt.Sprintf("SET search_path TO \"%s\", public", ep.tempSchema)
if _, err := util.ExecContextWithLogging(ctx, ep.db, setSearchPathSQL, "set search_path for desired state"); err != nil {
if _, err := util.ExecContextWithLogging(ctx, conn, setSearchPathSQL, "set search_path for desired state"); err != nil {
return fmt.Errorf("failed to set search_path: %w", err)
}

Expand All @@ -227,7 +237,7 @@ func (ep *EmbeddedPostgres) ApplySchema(ctx context.Context, schema string, sql
// Execute the SQL directly
// Note: Desired state SQL should never contain operations like CREATE INDEX CONCURRENTLY
// that cannot run in transactions. Those are migration details, not state declarations.
if _, err := util.ExecContextWithLogging(ctx, ep.db, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
if _, err := util.ExecContextWithLogging(ctx, conn, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
return fmt.Errorf("failed to apply schema SQL to temporary schema %s: %w", ep.tempSchema, err)
}

Expand Down
16 changes: 13 additions & 3 deletions internal/postgres/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,26 @@ func (ed *ExternalDatabase) ApplySchema(ctx context.Context, schema string, sql
// Note: We use the temporary schema name (ed.tempSchema) instead of the user-provided schema name
// This ensures we don't interfere with existing schemas in the external database

// Acquire a single dedicated connection to ensure SET search_path affects
// all subsequent statements. Using *sql.DB (connection pool) does not
// guarantee the same connection across ExecContext calls, so session-scoped
// settings like search_path may be lost.
conn, err := ed.db.Conn(ctx)
if err != nil {
return fmt.Errorf("failed to acquire connection: %w", err)
}
defer conn.Close()

// Create the temporary schema
createSchemaSQL := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS \"%s\"", ed.tempSchema)
if _, err := util.ExecContextWithLogging(ctx, ed.db, createSchemaSQL, "create temporary schema"); err != nil {
if _, err := util.ExecContextWithLogging(ctx, conn, createSchemaSQL, "create temporary schema"); err != nil {
return fmt.Errorf("failed to create temporary schema %s: %w", ed.tempSchema, err)
}

// Set search_path to the temporary schema, with public as fallback
// for resolving extension types installed in public schema (issue #197)
setSearchPathSQL := fmt.Sprintf("SET search_path TO \"%s\", public", ed.tempSchema)
if _, err := util.ExecContextWithLogging(ctx, ed.db, setSearchPathSQL, "set search_path for desired state"); err != nil {
if _, err := util.ExecContextWithLogging(ctx, conn, setSearchPathSQL, "set search_path for desired state"); err != nil {
return fmt.Errorf("failed to set search_path: %w", err)
}

Expand All @@ -138,7 +148,7 @@ func (ed *ExternalDatabase) ApplySchema(ctx context.Context, schema string, sql
// Execute the SQL directly
// Note: Desired state SQL should never contain operations like CREATE INDEX CONCURRENTLY
// that cannot run in transactions. Those are migration details, not state declarations.
if _, err := util.ExecContextWithLogging(ctx, ed.db, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
if _, err := util.ExecContextWithLogging(ctx, conn, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
return fmt.Errorf("failed to apply schema SQL to temporary schema %s: %w", ed.tempSchema, err)
}

Expand Down
Loading