diff --git a/database/database_test.go b/database/database_test.go index abda3ddf..2b1cc151 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -12,6 +12,7 @@ import ( "time" gomysql "github.com/go-sql-driver/mysql" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/moov-io/base/database" "github.com/moov-io/base/log" @@ -38,7 +39,7 @@ func TestUniqueViolation(t *testing.T) { t.Error("should have matched postgres unique violation") } pgconnErr := &pgconn.PgError{ - Code: "23505", + Code: pgerrcode.UniqueViolation, } if !database.UniqueViolation(pgconnErr) { t.Error("should have matched PgError unique violation") @@ -78,7 +79,7 @@ func TestDeadlockFound(t *testing.T) { t.Error("should have matched postgres deadlock found") } pgconnErr := &pgconn.PgError{ - Code: "40P01", + Code: pgerrcode.DeadlockDetected, } if !database.DeadlockFound(pgconnErr) { t.Error("should have matched PgError deadlock found") diff --git a/database/mysql.go b/database/mysql.go index 08e5da0d..07d0d8b4 100644 --- a/database/mysql.go +++ b/database/mysql.go @@ -245,3 +245,16 @@ func MySQLDeadlockFound(err error) bool { return strings.Contains(err.Error(), fmt.Sprintf("Error %d", mysqlErrDeadlockFound)) } + +// RetryMySQLNonIdempotent is the MySQL equivalent of RetryPostgresNonIdempotent. +// TODO: implement a MySQL-specific retryClassifier (retryTx already handles the +// Begin/fn/Commit/Rollback loop and driver.ErrBadConn). Cover 1213 deadlock, +// 1205 lock wait timeout, 2013 connection lost. See +// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html +func RetryMySQLNonIdempotent(ctx context.Context, db *sql.DB, opts RetryNonIdempotentOptions, fn func(*sql.Tx) error) error { + _ = ctx + _ = db + _ = opts + _ = fn + return errors.New("database: RetryMySQLNonIdempotent is not yet implemented; see TODO") +} diff --git a/database/postgres.go b/database/postgres.go index 4411d660..33aa1e22 100644 --- a/database/postgres.go +++ b/database/postgres.go @@ -5,35 +5,29 @@ import ( "database/sql" "errors" "fmt" - "io" - "math/rand" "net" "strings" "time" "cloud.google.com/go/alloydbconn" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/moov-io/base/log" ) -const ( - // PostgreSQL Error Codes - // https://www.postgresql.org/docs/current/errcodes-appendix.html - postgresErrUniqueViolation = "23505" - postgresErrDeadlockFound = "40P01" -) - func postgresConnection(ctx context.Context, logger log.Logger, config PostgresConfig, databaseName string) (*sql.DB, error) { poolConfig, err := buildPgxPoolConfig(ctx, config, databaseName) if err != nil { return nil, logger.LogErrorf("building pgx pool config: %w", err).Err() } - // HealthCheckPeriod makes pgxpool ping idle connections in the background. - // Dead connections (e.g. from an AlloyDB switchover) are evicted before - // the application ever sees them. + // HealthCheckPeriod is how often the background goroutine evicts connections + // that exceeded MaxConnLifetime or MaxConnIdleTime. It does NOT ping for + // liveness — dead connections are caught at acquire time by the ResetSession + // ping (default: ping if idle > 1s), with database/sql retrying on a fresh + // conn and RetryPostgresNonIdempotent retrying beyond that. poolConfig.HealthCheckPeriod = 1 * time.Second pool, err := pgxpool.NewWithConfig(ctx, poolConfig) @@ -154,11 +148,11 @@ func PostgresUniqueViolation(err error) bool { } var pgError *pgconn.PgError - if errors.As(err, &pgError) && pgError.Code == postgresErrUniqueViolation { + if errors.As(err, &pgError) && pgError.Code == pgerrcode.UniqueViolation { return true } - return strings.Contains(err.Error(), postgresErrUniqueViolation) + return strings.Contains(err.Error(), pgerrcode.UniqueViolation) } // PostgresDeadlockFound returns true when the provided error matches the Postgres code @@ -169,119 +163,126 @@ func PostgresDeadlockFound(err error) bool { } var pgError *pgconn.PgError - if errors.As(err, &pgError) && pgError.Code == postgresErrDeadlockFound { + if errors.As(err, &pgError) && pgError.Code == pgerrcode.DeadlockDetected { return true } - return strings.Contains(err.Error(), postgresErrDeadlockFound) + return strings.Contains(err.Error(), pgerrcode.DeadlockDetected) +} + +// isPermanentPostgresError reports SQLSTATE classes that would fail again on +// retry, used by isRetryablePostgresPreCommitError to opt out. Only +// clearly-permanent classes are listed so unknown/transient codes are retried. +func isPermanentPostgresError(pgErr *pgconn.PgError) bool { + return pgerrcode.IsDataException(pgErr.Code) || // class 22 + pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) || // class 23 + pgerrcode.IsSyntaxErrororAccessRuleViolation(pgErr.Code) // class 42 } -// IsRetryablePostgresError returns true if the error is a transient connection-level -// error that is safe to retry. This covers the errors seen during AlloyDB maintenance -// switchovers and other transient network failures. -func IsRetryablePostgresError(err error) bool { +// retryJitterMax is the upper bound on the random delay between retries. Full +// jitter in [0, retryJitterMax) spreads concurrent retries across the fleet +// rather than slamming the database all at once. AlloyDB maintenance +// switchovers cause <1s downtime, so maxRetryAttempts attempts with up to +// retryJitterMax between each spans the blip; the caller's context deadline is +// the escape hatch for tighter latency budgets. +const retryJitterMax = 100 * time.Millisecond + +// isRetryablePostgresPreCommitError is the pre-commit classifier for +// RetryPostgresNonIdempotent (errors from BeginTx or fn). OPTS OUT: pre-commit retry is +// always safe (the transaction rolls back), so it retries everything EXCEPT +// context.Canceled/DeadlineExceeded and the permanent SQLSTATE classes +// (isPermanentPostgresError). Unknown *pgconn.PgError codes and non-PgError +// errors (network, driver.ErrBadConn, SafeToRetry) are retried — a false +// positive (~200ms wasted on a permanent error) costs less than a false +// negative (spurious user-facing failure), so opt-out is safer than opt-in. +// +// opts.IsRetryable is additive (OR); to narrow the set, wrap with a predicate +// that re-checks. +func isRetryablePostgresPreCommitError(err error) bool { if err == nil { return false } - - // PostgreSQL error codes that are safe to retry because the server guarantees - // the transaction was rolled back before the error was returned. - // - // 57P01 admin_shutdown, 57P02 crash_shutdown: fast shutdown rolls back all - // active transactions before disconnecting clients. See: - // https://www.postgresql.org/docs/current/server-shutdown.html - // - // 57P03 cannot_connect_now: server is still starting up; the operation never - // reached a transaction. - // - // 40001 serialization_failure, 40P01 deadlock_detected: the server explicitly - // rolled back the transaction and expects the client to retry. See: - // https://www.postgresql.org/docs/current/mvcc-serialization-failure-handling.html - // - // 53300 too_many_connections: rejected at connect time; no transaction started. - // - // 57014 query_canceled: the server rolled back the transaction before returning - // this error. - // - // Note: 08xxx (connection_exception class) codes are intentionally omitted. - // pgx surfaces connection-level failures as TCP/network errors, not as - // *pgconn.PgError, so those cases are handled below. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } var pgErr *pgconn.PgError if errors.As(err, &pgErr) { - switch pgErr.Code { - case "57P01", "57P02", "57P03", // admin_shutdown, crash_shutdown, cannot_connect_now - "40001", "40P01", // serialization_failure, deadlock_detected - "53300", // too_many_connections - "57014": // query_canceled - return true - } - return false + return !isPermanentPostgresError(pgErr) } + return true +} - // Network-level errors: connection reset, broken pipe, EOF, etc. - // These occur when the TCP connection is severed during a switchover. - var netErr *net.OpError - if errors.As(err, &netErr) { - return true - } - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - return true +// capturePostgresXid returns the current transaction's id (pg_current_xact_id_if_assigned) +// as a string, or nil if no id is assigned (read-only transaction). Captured +// before Commit so verifyPostgresCommit can check whether an ambiguous commit +// landed. The ::text cast sidesteps pgx's xid8 Go type-mapping; the string is +// passed back to pg_xact_status($1::xid8) on verification. Returns nil for +// read-only transactions (no xid), in which case retryTx retries an ambiguous +// commit without verifying (no writes to duplicate). +func capturePostgresXid(tx *sql.Tx) (any, error) { + var s sql.NullString + if err := tx.QueryRow("SELECT pg_current_xact_id_if_assigned()::text").Scan(&s); err != nil { + return nil, err } - if errors.Is(err, context.DeadlineExceeded) { - return false // don't retry if the caller's context timed out + if !s.Valid { + return nil, nil // read-only, no xid assigned } + return s.String, nil +} - // pgx sometimes wraps TCP-level disconnection errors as plain strings rather - // than preserving the original net.OpError type. These are safe to retry - // because they occur on the write path — the server never received the query. - // "conn closed" is pgx's own sentinel for a connection already closed before use. - // Note: "connection refused" and "unexpected EOF" are intentionally excluded — - // the former does not appear in pgx's codebase, the latter is already caught - // above by errors.Is(err, io.ErrUnexpectedEOF). - msg := err.Error() - if strings.Contains(msg, "connection reset by peer") || - strings.Contains(msg, "broken pipe") || - strings.Contains(msg, "conn closed") { - return true +// verifyPostgresCommit checks whether the transaction with the given xid +// committed, aborted, or is indeterminate, via pg_xact_status on a fresh +// connection (the original may be dead). xid is the string from +// capturePostgresXid. Returns: +// - commitStatusAborted: the commit did NOT land → safe to retry. +// - commitStatusCommitted: the commit DID land → don't retry (ErrCommitted). +// - commitStatusUnknown: pg_xact_status returned NULL (xid too old, or not +// yet visible after a failover) or errored, or status was "in progress" → +// ambiguous (ErrCommitPhase). +func verifyPostgresCommit(ctx context.Context, db retryDB, xid any) (commitStatus, error) { + s, ok := xid.(string) + if !ok || s == "" { + return commitStatusUnknown, nil + } + var status sql.NullString + if err := db.QueryRowContext(ctx, "SELECT pg_xact_status($1::xid8)", s).Scan(&status); err != nil { + return commitStatusUnknown, err + } + switch status.String { + case "aborted": + return commitStatusAborted, nil + case "committed": + return commitStatusCommitted, nil + default: // "" (NULL), "in progress", or unexpected → ambiguous + return commitStatusUnknown, nil } - - return false } -// retryJitterMax is the upper bound for the random delay between retries. -// Full jitter in [0, retryJitterMax) spreads concurrent retries across the -// fleet rather than letting them all slam the database at the same instant. -// AlloyDB planned maintenance switchovers typically cause less than one second -// of downtime, so three attempts with up to 100ms between each gives a worst- -// case retry window of ~200ms — well within typical service SLAs while still -// spanning the switchover blip. The caller's context deadline is the escape -// hatch for services with tighter latency budgets. -const retryJitterMax = 100 * time.Millisecond +// RetryPostgresNonIdempotent runs fn in a Postgres transaction, retrying the +// whole transaction on transient errors. fn gets a fresh *sql.Tx each attempt; +// if it returns an error the tx is rolled back, if nil the tx is committed. +// +// Pre-commit errors (from BeginTx or fn) use the opt-out +// isRetryablePostgresPreCommitError (opts.IsRetryable is additive on it). +// Commit errors are always verified: before each Commit the transaction id is +// captured (pg_current_xact_id_if_assigned), and on a commit error +// pg_xact_status is queried on a fresh connection — aborted → retry, +// committed → ErrCommitted (don't retry), inconclusive → ErrCommitPhase. +// Read-only transactions (no xid) retry without verification. No client-side +// "retryable commit" guess — the server is the authority on whether the commit +// landed. +// +// db must be pgx-backed (e.g. from database.New with a PostgresConfig); other +// drivers won't get pgx-specific retries or commit verification. +func RetryPostgresNonIdempotent(ctx context.Context, db *sql.DB, opts RetryNonIdempotentOptions, fn func(*sql.Tx) error) error { + return retryTx(ctx, db, postgresTxClassifier, opts, fn) +} -// RetryPostgres executes fn up to maxAttempts times, retrying on transient -// connection errors. This is intended for use around individual database -// operations to survive brief outages like AlloyDB maintenance switchovers. -func RetryPostgres(ctx context.Context, maxAttempts int, fn func() error) error { - if maxAttempts <= 0 { - maxAttempts = 3 - } - var err error - for attempt := 0; attempt < maxAttempts; attempt++ { - err = fn() - if err == nil { - return nil - } - if !IsRetryablePostgresError(err) { - return err - } - if attempt < maxAttempts-1 { - delay := time.Duration(rand.Int63n(int64(retryJitterMax))) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(delay): - } - } - } - return err +// postgresTxClassifier pairs the pre-commit (opt-out) classifier with commit +// verification (capturePostgresXid/verifyPostgresCommit). Commit errors are +// always verified, not classified. See ErrCommitPhase / ErrCommitted. +var postgresTxClassifier = retryClassifier{ + preCommit: isRetryablePostgresPreCommitError, + captureXid: capturePostgresXid, + verifyCommit: verifyPostgresCommit, } diff --git a/database/postgres_test.go b/database/postgres_test.go index 9f663a0c..6023c369 100644 --- a/database/postgres_test.go +++ b/database/postgres_test.go @@ -2,14 +2,13 @@ package database_test import ( "context" - "errors" "io" - "net" "os" "path/filepath" "testing" "time" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/moov-io/base" "github.com/moov-io/base/database" @@ -184,61 +183,10 @@ func Test_Postgres_Alloy_Migrations(t *testing.T) { defer db.Close() } -func TestIsRetryablePostgresError(t *testing.T) { - // nil error is not retryable - require.False(t, database.IsRetryablePostgresError(nil)) - - // admin_shutdown is retryable (seen during AlloyDB maintenance) - require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "57P01"})) - - // crash_shutdown is retryable - require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "57P02"})) - - // cannot_connect_now is retryable - require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "57P03"})) - - // serialization_failure and deadlock_detected are retryable (server rolled back) - require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "40001"})) - require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "40P01"})) - - // too_many_connections is retryable (rejected at connect time) - require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "53300"})) - - // query_canceled is retryable (server rolled back the transaction) - require.True(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "57014"})) - - // unique_violation is NOT retryable (application-level error) - require.False(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "23505"})) - - // syntax_error is NOT retryable - require.False(t, database.IsRetryablePostgresError(&pgconn.PgError{Code: "42601"})) - - // EOF is retryable (connection severed) - require.True(t, database.IsRetryablePostgresError(io.EOF)) - require.True(t, database.IsRetryablePostgresError(io.ErrUnexpectedEOF)) - - // net.OpError is retryable - require.True(t, database.IsRetryablePostgresError(&net.OpError{ - Op: "read", - Err: errors.New("connection reset by peer"), - })) - - // context.DeadlineExceeded is NOT retryable - require.False(t, database.IsRetryablePostgresError(context.DeadlineExceeded)) - - // String-matched connection errors - require.True(t, database.IsRetryablePostgresError(errors.New("connection reset by peer"))) - require.True(t, database.IsRetryablePostgresError(errors.New("broken pipe"))) - require.True(t, database.IsRetryablePostgresError(errors.New("conn closed"))) - - // Random application error is NOT retryable - require.False(t, database.IsRetryablePostgresError(errors.New("invalid input"))) -} - -func TestRetryPostgres(t *testing.T) { +func TestRetryIdempotent(t *testing.T) { t.Run("succeeds on first attempt", func(t *testing.T) { calls := 0 - err := database.RetryPostgres(context.Background(), 3, func() error { + err := database.RetryIdempotent(context.Background(), database.RetryIdempotentOptions{}, func() error { calls++ return nil }) @@ -246,12 +194,15 @@ func TestRetryPostgres(t *testing.T) { require.Equal(t, 1, calls) }) - t.Run("retries on transient error then succeeds", func(t *testing.T) { + t.Run("retries on any error by default, including non-Pg-retryable ones", func(t *testing.T) { + // unique_violation is NOT retryable per IsRetryablePostgresError, but + // RetryIdempotent's default predicate retries any error except context + // cancellation because the caller has vouched that fn is idempotent. calls := 0 - err := database.RetryPostgres(context.Background(), 3, func() error { + err := database.RetryIdempotent(context.Background(), database.RetryIdempotentOptions{}, func() error { calls++ if calls < 3 { - return &pgconn.PgError{Code: "57P01"} // admin_shutdown + return &pgconn.PgError{Code: pgerrcode.UniqueViolation} } return nil }) @@ -259,33 +210,46 @@ func TestRetryPostgres(t *testing.T) { require.Equal(t, 3, calls) }) - t.Run("does not retry non-retryable errors", func(t *testing.T) { + t.Run("custom IsRetryable short-circuits non-retryable errors", func(t *testing.T) { + calls := 0 + neverRetry := func(error) bool { return false } + err := database.RetryIdempotent(context.Background(), database.RetryIdempotentOptions{IsRetryable: neverRetry}, func() error { + calls++ + return io.EOF + }) + require.Error(t, err) + require.Equal(t, 1, calls) + require.ErrorIs(t, err, io.EOF) + }) + + t.Run("does not retry context cancellation by default", func(t *testing.T) { calls := 0 - err := database.RetryPostgres(context.Background(), 3, func() error { + err := database.RetryIdempotent(context.Background(), database.RetryIdempotentOptions{}, func() error { calls++ - return &pgconn.PgError{Code: "23505"} // unique_violation + return context.Canceled }) require.Error(t, err) require.Equal(t, 1, calls) + require.ErrorIs(t, err, context.Canceled) }) - t.Run("respects context cancellation", func(t *testing.T) { + t.Run("respects context cancellation between attempts", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // cancel immediately calls := 0 - err := database.RetryPostgres(ctx, 3, func() error { + err := database.RetryIdempotent(ctx, database.RetryIdempotentOptions{}, func() error { calls++ return io.EOF // retryable, but context is done }) - // First call happens, then context cancellation is detected + // First call happens, then context cancellation is detected before the next attempt require.ErrorIs(t, err, context.Canceled) require.Equal(t, 1, calls) }) t.Run("exhausts all attempts", func(t *testing.T) { calls := 0 - err := database.RetryPostgres(context.Background(), 3, func() error { + err := database.RetryIdempotent(context.Background(), database.RetryIdempotentOptions{}, func() error { calls++ return io.EOF }) diff --git a/database/retry.go b/database/retry.go new file mode 100644 index 00000000..a5325dfe --- /dev/null +++ b/database/retry.go @@ -0,0 +1,255 @@ +package database + +import ( + "context" + "database/sql" + "errors" + "fmt" + "math/rand" + "time" +) + +// RetryNonIdempotentOptions configures RetryPostgresNonIdempotent (and future +// RetryMySQLNonIdempotent / RetrySpannerNonIdempotent). +type RetryNonIdempotentOptions struct { + // TxOptions passed to (*sql.DB).BeginTx each attempt; nil = default isolation. + TxOptions *sql.TxOptions + // IsRetryable, if non-nil, is OR'd with the backend's pre-commit classifier + // (errors from BeginTx or fn). Commit errors are always verified via + // pg_xact_status, not classified, so IsRetryable does not affect them. + IsRetryable func(err error) bool +} + +// RetryIdempotentOptions configures RetryIdempotent. +type RetryIdempotentOptions struct { + // IsRetryable, if nil, retries any error except context.Canceled/DeadlineExceeded. + // If non-nil, replaces the default. + IsRetryable func(err error) bool +} + +// maxRetryAttempts is the attempt count for RetryPostgresNonIdempotent and RetryIdempotent +// (initial + 2 retries), matching database/sql's maxBadConnRetries+1. Not +// configurable: use the context deadline to bound total time, or wrap for more. +const maxRetryAttempts = 3 + +// ErrCommitPhase wraps a non-retried commit-phase error whose commit status +// could not be verified — i.e. the commit may or may not have landed (e.g. a +// network error after COMMIT was sent, or pg_xact_status() couldn't determine +// the outcome). The caller should treat it as ambiguous: alert/reconcile, or +// re-check whether the commit landed. The underlying error is preserved. +// +// With commit verification enabled (RetryPostgresNonIdempotent), ErrCommitPhase +// is the fallback when verification is inconclusive; verified-aborted commits +// are retried, verified-committed commits return ErrCommitted. +var ErrCommitPhase = errors.New("database: commit-phase error with inconclusive commit status; the transaction may have committed before the error was returned") + +// ErrCommitted wraps a commit-phase error where verification (pg_xact_status) +// confirmed the transaction DID commit on the server before the error was +// returned to the client (e.g. the connection was severed after the commit +// recorded but before the response arrived). The caller must NOT retry (that +// would duplicate the committed work); it should reconcile/alert. The +// underlying commit error is preserved for inspection via errors.Is/errors.As. +var ErrCommitted = errors.New("database: transaction committed before the error was returned; do not retry") + +// commitStatus is the verified outcome of an ambiguous commit-phase error. +type commitStatus int + +const ( + commitStatusUnknown commitStatus = iota // can't determine → ErrCommitPhase + commitStatusAborted // commit did NOT land → safe to retry + commitStatusCommitted // commit DID land → ErrCommitted, don't retry +) + +// beginTxer is satisfied by *sql.DB; the interface lets tests fake BeginTx. +type beginTxer interface { + BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) +} + +// retryDB extends beginTxer with QueryRowContext, so the retry loop can run a +// fresh-connection query (pg_xact_status) to verify an ambiguous commit. +// Satisfied by *sql.DB. +type retryDB interface { + beginTxer + QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row +} + +// retryClassifier holds the pre-commit classifier and optional commit- +// verification hooks. preCommit classifies BeginTx/fn errors (opt-out: pre- +// commit retry is always safe — the transaction rolls back). Commit errors are +// not classified; they're always verified via captureXid/verifyCommit (a commit +// error may mean the commit already succeeded, so the server is the authority, +// not a client-side guess). See ErrCommitPhase/ErrCommitted. +type retryClassifier struct { + preCommit func(error) bool + // captureXid, if non-nil, is called after fn succeeds and before Commit to + // capture a transaction identifier for verifyCommit. Returns nil if the + // transaction has no id (e.g. read-only — safe to retry without verifying). + captureXid func(*sql.Tx) (any, error) + // verifyCommit, if non-nil, is called on every commit error (with the xid + // from captureXid) to check whether the commit landed. Runs on a fresh + // connection (db) since the original may be dead. + verifyCommit func(ctx context.Context, db retryDB, xid any) (commitStatus, error) +} + +// RetryIdempotent runs fn up to maxRetryAttempts times, retrying on any error +// opts.IsRetryable says is retryable (default: any except context cancellation/ +// deadline). fn MUST be idempotent — no transaction wrapper, so a retry may +// re-execute work that already committed. +func RetryIdempotent(ctx context.Context, opts RetryIdempotentOptions, fn func() error) error { + isRetryable := opts.IsRetryable + if isRetryable == nil { + isRetryable = isRetryableUnsafeDefault + } + var err error + for attempt := 0; attempt < maxRetryAttempts; attempt++ { + err = fn() + if err == nil { + return nil + } + if !isRetryable(err) { + return err + } + if !sleepWithJitter(ctx, attempt, maxRetryAttempts) { + return ctx.Err() + } + } + return err +} + +// isRetryableUnsafeDefault retries any error except context cancellation/deadline +// (RetryIdempotent callers vouch fn is idempotent). +func isRetryableUnsafeDefault(err error) bool { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + return true +} + +// retryTx is the shared retry loop for RetryPostgresNonIdempotent (and future +// MySQL/Spanner implementations). opts.IsRetryable is OR'd with base.preCommit +// (pre-commit errors only — commit errors are always verified, not classified). +// +// Pre-commit errors (from BeginTx or fn): retryable per preCommit (opt-out — +// the transaction rolls back, so retry is always safe). +// +// Commit errors (from (*sql.Tx).Commit): always verified via verifyCommit when +// an xid was captured — verified-aborted → retry, verified-committed → +// ErrCommitted (don't retry), inconclusive → ErrCommitPhase. Read-only +// transactions (no xid) retry without verifying (no writes to duplicate). A +// retryable commit error (aborted/read-only) on exhaustion returns the original +// error, not ErrCommitPhase (it's known non-commit); only inconclusive commits +// get ErrCommitPhase. +// +// database/sql already retries driver.ErrBadConn for *sql.DB methods (immediate, +// up to 3 attempts) before surfacing it; this outer loop layers on top with +// jitter for longer outages. *sql.Tx methods have no internal retry, so this is +// the only retry for tx.Exec/tx.Commit. +func retryTx(ctx context.Context, db retryDB, base retryClassifier, opts RetryNonIdempotentOptions, fn func(*sql.Tx) error) error { + preCommit := base.preCommit + if opts.IsRetryable != nil { + extra := opts.IsRetryable + preCommit = func(err error) bool { return base.preCommit(err) || extra(err) } + } + var lastErr error + var lastCommitPhase bool + var lastRetryable bool + for attempt := 0; attempt < maxRetryAttempts; attempt++ { + var commitPhase bool + var xid any + lastErr, commitPhase, xid = runOneTxAttempt(ctx, db, opts.TxOptions, base.captureXid, fn) + lastCommitPhase = commitPhase + if lastErr == nil { + return nil + } + var retryable bool + if !commitPhase { + // Pre-commit error: opt-out classifier decides. + retryable = preCommit(lastErr) + } else { + // Commit error: always verify whether the commit landed (the server + // is the authority — a client-side guess can't tell committed-but- + // response-lost from genuinely-aborted). + switch { + case base.captureXid != nil && xid == nil: + // Read-only transaction (no xid): no writes to duplicate, retry + // is safe without verification. + retryable = true + case base.verifyCommit != nil && xid != nil: + switch status, verr := base.verifyCommit(ctx, db, xid); { + case verr == nil && status == commitStatusAborted: + retryable = true + case verr == nil && status == commitStatusCommitted: + return fmt.Errorf("%w: %w", ErrCommitted, lastErr) + default: // commitStatusUnknown or verification error → ambiguous + retryable = false + } + default: + retryable = false // no verifier → ambiguous (ErrCommitPhase) + } + } + lastRetryable = retryable + if !retryable { + break + } + if !sleepWithJitter(ctx, attempt, maxRetryAttempts) { + return ctx.Err() + } + } + // A retryable commit error (verified-aborted or read-only) is known + // non-commit, so on exhaustion return the original error — not ErrCommitPhase + // (which is only for inconclusive commits). + if lastCommitPhase && !lastRetryable { + return fmt.Errorf("%w: %w", ErrCommitPhase, lastErr) + } + return lastErr +} + +// runOneTxAttempt runs one begin/fn/commit attempt, returning the first error, +// whether it came from Commit (commitPhase), and the transaction id captured +// by captureXid (nil if none). The deferred Rollback is a no-op after a +// successful Commit. A captureXid failure is treated as a pre-commit error +// (rolled back, classified by preCommit). +func runOneTxAttempt(ctx context.Context, db beginTxer, txOpts *sql.TxOptions, captureXid func(*sql.Tx) (any, error), fn func(*sql.Tx) error) (err error, commitPhase bool, xid any) { + tx, beginErr := db.BeginTx(ctx, txOpts) + if beginErr != nil { + return beginErr, false, nil + } + defer func() { + if p := recover(); p != nil { + _ = tx.Rollback() + panic(p) + } + if err != nil { + _ = tx.Rollback() + } + }() + if err = fn(tx); err != nil { + return err, false, nil + } + if captureXid != nil { + xid, err = captureXid(tx) + if err != nil { + return err, false, nil + } + } + if err = tx.Commit(); err != nil { + return err, true, xid + } + return nil, false, nil +} + +// sleepWithJitter sleeps a random duration in [0, retryJitterMax) before the +// next attempt, respecting ctx. Returns false if ctx is done. No sleep after +// the final attempt. +func sleepWithJitter(ctx context.Context, attempt, maxAttempts int) bool { + if attempt >= maxAttempts-1 { + return true + } + delay := time.Duration(rand.Int63n(int64(retryJitterMax))) + select { + case <-ctx.Done(): + return false + case <-time.After(delay): + return true + } +} diff --git a/database/retry_test.go b/database/retry_test.go new file mode 100644 index 00000000..a92a346f --- /dev/null +++ b/database/retry_test.go @@ -0,0 +1,615 @@ +package database + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "fmt" + "io" + "net" + "strings" + "sync" + "testing" + + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5/pgconn" + "github.com/stretchr/testify/require" +) + +// safeToRetryErr is a test double for a pgx error that pgconn.SafeToRetry +// flags as safe (i.e. it implements the unexported interface{ SafeToRetry() bool } +// that pgconn.SafeToRetry looks for via errors.As). pgx's own SafeToRetry-flagged +// types have unexported fields, so we construct one directly here. +type safeToRetryErr struct{ msg string } + +func (e *safeToRetryErr) Error() string { return e.msg } +func (e *safeToRetryErr) SafeToRetry() bool { return true } + +// mockConnector + mockDriver + mockConn + mockTx form a minimal +// database/sql/driver implementation whose BeginTx/Commit/Rollback/Exec return +// configurable, stateful error sequences. This lets RetryPostgresNonIdempotent (and the +// shared retryTx helper it delegates to) be driven through a real *sql.DB +// without a Postgres. *sql.Tx is concrete, so a mock driver is the only way +// to control Commit/Rollback errors. +type mockConnector struct{ d *mockDriver } + +func (c *mockConnector) Connect(context.Context) (driver.Conn, error) { + return &mockConn{d: c.d}, nil +} +func (c *mockConnector) Driver() driver.Driver { return c.d } + +type mockDriver struct { + beginErrs []error + commitErrs []error + rollbackErrs []error + execErrs []error + // Commit-verification scripting (matched by query substring in QueryContext): + // captureXidResult is returned for pg_current_xact_id_if_assigned (a string + // xid, or nil for read-only/NULL); verifyStatusResult for pg_xact_status + // ("aborted"/"committed"/"in progress"/"" for NULL); verifyErr makes the + // pg_xact_status query fail. + captureXidResult any + verifyStatusResult string + verifyErr error + mu sync.Mutex + beginCalls int + commitCalls int + rollbackCalls int + execCalls int + captureCalls int + verifyCalls int +} + +func (d *mockDriver) Open(string) (driver.Conn, error) { return &mockConn{d: d}, nil } + +func (d *mockDriver) nthBegin() (int, error) { + d.mu.Lock() + defer d.mu.Unlock() + i := d.beginCalls + d.beginCalls++ + return i, nthErr(d.beginErrs, i) +} +func (d *mockDriver) nthCommit() (int, error) { + d.mu.Lock() + defer d.mu.Unlock() + i := d.commitCalls + d.commitCalls++ + return i, nthErr(d.commitErrs, i) +} +func (d *mockDriver) nthRollback() (int, error) { + d.mu.Lock() + defer d.mu.Unlock() + i := d.rollbackCalls + d.rollbackCalls++ + return i, nthErr(d.rollbackErrs, i) +} + +type mockConn struct{ d *mockDriver } + +func (c *mockConn) BeginTx(context.Context, driver.TxOptions) (driver.Tx, error) { + if _, err := c.d.nthBegin(); err != nil { + return nil, err + } + return &mockTx{d: c.d}, nil +} +func (c *mockConn) Begin() (driver.Tx, error) { + return c.BeginTx(context.Background(), driver.TxOptions{}) +} +func (c *mockConn) Close() error { return nil } +func (c *mockConn) Prepare(query string) (driver.Stmt, error) { + return c.PrepareContext(context.Background(), query) +} +func (c *mockConn) PrepareContext(context.Context, string) (driver.Stmt, error) { + return nil, errors.New("mockDriver: PrepareContext not implemented") +} + +// QueryContext implements driver.QueryerContext so *sql.Tx.QueryRow and +// *sql.DB.QueryRowContext work. It recognizes the commit-verification queries +// (by substring) and returns the scripted value from the driver. +func (c *mockConn) QueryContext(_ context.Context, query string, _ []driver.NamedValue) (driver.Rows, error) { + c.d.mu.Lock() + defer c.d.mu.Unlock() + switch { + case strings.Contains(query, "pg_current_xact_id_if_assigned"): + c.d.captureCalls++ + return &mockRows{val: c.d.captureXidResult}, nil + case strings.Contains(query, "pg_xact_status"): + c.d.verifyCalls++ + if c.d.verifyErr != nil { + return nil, c.d.verifyErr + } + return &mockRows{val: c.d.verifyStatusResult}, nil + } + return nil, errors.New("mockDriver: unexpected query: " + query) +} + +type mockTx struct{ d *mockDriver } + +func (t *mockTx) Commit() error { + _, err := t.d.nthCommit() + return err +} +func (t *mockTx) Rollback() error { + _, err := t.d.nthRollback() + return err +} + +// mockRows is a single-row, single-column result set holding val (a string, or +// nil for SQL NULL). Used by QueryContext for the verification queries. +type mockRows struct { + val any + read bool +} + +func (r *mockRows) Columns() []string { return []string{"col"} } +func (r *mockRows) Close() error { return nil } +func (r *mockRows) Next(dest []driver.Value) error { + if r.read { + return io.EOF + } + r.read = true + dest[0] = r.val + return nil +} + +// nthErr returns errs[i] or nil when i is out of range. Tests pass a sequence +// like []error{err1, err2} to mean "fail twice then succeed". +func nthErr(errs []error, i int) error { + if i < len(errs) { + return errs[i] + } + return nil +} + +func newMockDB(d *mockDriver) *sql.DB { + return sql.OpenDB(&mockConnector{d: d}) +} + +func TestRetryPostgresNonIdempotent(t *testing.T) { + t.Run("succeeds on first attempt", func(t *testing.T) { + d := &mockDriver{} + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 1, calls) + require.Equal(t, 1, d.beginCalls) + require.Equal(t, 1, d.commitCalls) + }) + + t.Run("fn error then succeed rolls back between attempts", func(t *testing.T) { + d := &mockDriver{} + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + if calls < 2 { + return &pgconn.PgError{Code: pgerrcode.SerializationFailure} + } + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, calls) + // Two begins, two commits (first attempt's commit is skipped because fn + // returned an error -> rollback instead). + require.Equal(t, 2, d.beginCalls) + require.Equal(t, 1, d.commitCalls) // only the successful attempt commits + require.Equal(t, 1, d.rollbackCalls) + }) + + t.Run("non-retryable fn error short-circuits", func(t *testing.T) { + d := &mockDriver{} + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return &pgconn.PgError{Code: pgerrcode.UniqueViolation} + }) + require.Error(t, err) + require.Equal(t, 1, calls) + require.Equal(t, 1, d.beginCalls) + require.Equal(t, 0, d.commitCalls) + require.Equal(t, 1, d.rollbackCalls) + }) + + t.Run("driver.ErrBadConn from fn is retried (fn-path signal)", func(t *testing.T) { + // pgx's stdlib adapter converts SafeToRetry-flagged Exec/Query errors to + // driver.ErrBadConn before fn sees them, so this is the primary retry + // trigger inside a transaction. + d := &mockDriver{} + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + if calls < 2 { + return driver.ErrBadConn + } + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, calls) + require.Equal(t, 2, d.beginCalls) + }) + + t.Run("SafeToRetry-flagged Begin failure is retried (Begin/Commit-path signal)", func(t *testing.T) { + // Begin/Commit errors flow through the stdlib adapter unchanged, so + // pgconn.SafeToRetry sees the original pgx error. *sql.DB does not + // internally retry on non-ErrBadConn errors, so our retry loop sees it. + d := &mockDriver{ + beginErrs: []error{&safeToRetryErr{msg: "pre-send begin failure"}}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 1, calls) + require.Equal(t, 2, d.beginCalls) // first Begin failed, second succeeded + }) + + t.Run("SafeToRetry-flagged Commit failure is verified-aborted and retried", func(t *testing.T) { + // A SafeToRetry-flagged commit error (pre-send) for a write transaction + // is now verified (not fast-pathed): pg_xact_status says aborted (the + // COMMIT never sent, so the tx didn't commit) → retry → second attempt + // succeeds. + d := &mockDriver{ + captureXidResult: "1", // write transaction + verifyStatusResult: "aborted", + commitErrs: []error{&safeToRetryErr{msg: "pre-send commit failure"}}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, calls) + require.Equal(t, 2, d.commitCalls) + require.Equal(t, 1, d.verifyCalls) // verified once (first attempt's commit) + }) + + t.Run("commit-phase network error with inconclusive verification is wrapped with ErrCommitPhase", func(t *testing.T) { + // A network error during commit could mean the commit succeeded but the + // response was lost. The commit classifier rejects it; verification + // (pg_xact_status) returns NULL (inconclusive, e.g. xid not yet visible + // after a failover) → ambiguous → ErrCommitPhase (caller decides). + d := &mockDriver{ + captureXidResult: "1", // write transaction, has an xid + verifyStatusResult: "", // NULL → inconclusive + commitErrs: []error{io.EOF}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.Error(t, err) + require.Equal(t, 1, calls) // not retried + require.Equal(t, 1, d.commitCalls) + require.Equal(t, 0, d.rollbackCalls) // commit failed; rollback is a no-op (ErrTxDone) + require.Equal(t, 1, d.captureCalls) // xid captured before commit + require.Equal(t, 1, d.verifyCalls) // pg_xact_status queried once + require.ErrorIs(t, err, ErrCommitPhase) + require.ErrorIs(t, err, io.EOF) // underlying error preserved + }) + + t.Run("commit-phase admin_shutdown (57P01) with inconclusive verification is wrapped with ErrCommitPhase", func(t *testing.T) { + // 57P01 is excluded from the commit classifier; verification inconclusive + // → ErrCommitPhase. + d := &mockDriver{ + captureXidResult: "1", + verifyStatusResult: "", + commitErrs: []error{&pgconn.PgError{Code: pgerrcode.AdminShutdown}}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.Error(t, err) + require.Equal(t, 1, calls) // not retried + require.Equal(t, 1, d.commitCalls) + require.Equal(t, 1, d.verifyCalls) + require.ErrorIs(t, err, ErrCommitPhase) + var pgErr *pgconn.PgError + require.ErrorAs(t, err, &pgErr) + require.Equal(t, pgerrcode.AdminShutdown, pgErr.Code) + }) + + t.Run("commit-phase verify-aborted is retried", func(t *testing.T) { + // Network error at commit; pg_xact_status says aborted (commit didn't + // land) → safe to retry → second attempt succeeds. + d := &mockDriver{ + captureXidResult: "1", + verifyStatusResult: "aborted", + commitErrs: []error{io.EOF, nil}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, calls) + require.Equal(t, 2, d.commitCalls) + require.Equal(t, 1, d.verifyCalls) // verified once (first attempt's commit) + }) + + t.Run("commit-phase verify-committed returns ErrCommitted (not retried)", func(t *testing.T) { + // Network error at commit; pg_xact_status says committed (commit DID + // land) → must NOT retry (would duplicate) → ErrCommitted. + d := &mockDriver{ + captureXidResult: "1", + verifyStatusResult: "committed", + commitErrs: []error{io.EOF}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.Error(t, err) + require.Equal(t, 1, calls) // not retried + require.Equal(t, 1, d.verifyCalls) + require.ErrorIs(t, err, ErrCommitted) + require.ErrorIs(t, err, io.EOF) // underlying commit error preserved + require.NotErrorIs(t, err, ErrCommitPhase) // not ambiguous — verified committed + }) + + t.Run("commit-phase read-only (no xid) is retried without verifying", func(t *testing.T) { + // Read-only transaction (pg_current_xact_id_if_assigned returns NULL): + // no writes to duplicate, so retry is safe without calling pg_xact_status. + d := &mockDriver{ + captureXidResult: nil, // read-only + commitErrs: []error{io.EOF, nil}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, calls) + require.Equal(t, 0, d.verifyCalls) // verification skipped (read-only) + require.Equal(t, 2, d.captureCalls) + }) + + t.Run("commit-phase verify-error is ambiguous (ErrCommitPhase)", func(t *testing.T) { + // pg_xact_status itself errors (e.g. the fresh connection also failed) → + // inconclusive → ErrCommitPhase. + d := &mockDriver{ + captureXidResult: "1", + verifyErr: errors.New("verify connection lost"), + commitErrs: []error{io.EOF}, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.Error(t, err) + require.Equal(t, 1, calls) // not retried + require.Equal(t, 1, d.verifyCalls) + require.ErrorIs(t, err, ErrCommitPhase) + require.ErrorIs(t, err, io.EOF) + }) + + t.Run("commit-phase class-40 exhaustion returns the original error (not ErrCommitPhase)", func(t *testing.T) { + // serialization_failure (class 40) at commit: the server rolled back, + // so pg_xact_status says aborted → retry. Retried until exhausted; the + // original error is returned (NOT ErrCommitPhase — it's known + // non-commit, just out of retries). + d := &mockDriver{ + captureXidResult: "1", + verifyStatusResult: "aborted", // server rolled back → aborted → retry each time + commitErrs: []error{ + &pgconn.PgError{Code: pgerrcode.SerializationFailure}, + &pgconn.PgError{Code: pgerrcode.SerializationFailure}, + &pgconn.PgError{Code: pgerrcode.SerializationFailure}, + }, + } + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return nil + }) + require.Error(t, err) + require.Equal(t, 3, calls) + require.Equal(t, 3, d.commitCalls) + require.Equal(t, 3, d.verifyCalls) // verified each attempt (always-verify) + require.NotErrorIs(t, err, ErrCommitPhase) // known non-commit, not ambiguous + var pgErr *pgconn.PgError + require.ErrorAs(t, err, &pgErr) + require.Equal(t, pgerrcode.SerializationFailure, pgErr.Code) + }) + + t.Run("additive opts.IsRetryable extends the default classifier", func(t *testing.T) { + // sentinelErr is not driver.ErrBadConn, not SafeToRetry-flagged, and not + // an IsRetryablePostgresError SQLSTATE code, so the default classifier + // returns false. opts.IsRetryable adds it on top. + sentinelErr := errors.New("custom retryable sentinel") + + d := &mockDriver{} + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{ + IsRetryable: func(err error) bool { return errors.Is(err, sentinelErr) }, + }, func(*sql.Tx) error { + calls++ + if calls < 2 { + return sentinelErr + } + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) + + t.Run("respects context cancellation between attempts", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + d := &mockDriver{} + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(ctx, db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + if calls == 1 { + cancel() // cancel after the first attempt runs + return &pgconn.PgError{Code: pgerrcode.SerializationFailure} + } + return nil + }) + // First attempt runs, then context cancellation is detected before the next attempt + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, 1, calls) + }) + + t.Run("exhausts all attempts", func(t *testing.T) { + d := &mockDriver{} + db := newMockDB(d) + defer db.Close() + + calls := 0 + err := RetryPostgresNonIdempotent(context.Background(), db, RetryNonIdempotentOptions{}, func(*sql.Tx) error { + calls++ + return &pgconn.PgError{Code: pgerrcode.DeadlockDetected} + }) + require.Error(t, err) + require.Equal(t, 3, calls) + require.Equal(t, 3, d.beginCalls) + require.Equal(t, 0, d.commitCalls) // fn always errors, so no commit + require.Equal(t, 3, d.rollbackCalls) + }) +} + +func TestIsRetryablePostgresPreCommitError(t *testing.T) { + // Pre-commit classifier (opt-out): retries everything except context + // cancellation and permanent SQLSTATE classes (22xxx/23xxx/42xxx). + + // Retried: pre-send guarantees. + require.True(t, isRetryablePostgresPreCommitError(driver.ErrBadConn)) + require.True(t, isRetryablePostgresPreCommitError(fmt.Errorf("wrapped: %w", driver.ErrBadConn))) + require.True(t, isRetryablePostgresPreCommitError(&safeToRetryErr{msg: "pre-send"})) + + // Retried: class 40 (server rolled back) and shutdown/cancel codes — the tx + // didn't commit pre-commit, so retry is safe even though some of these + // (57P01/57P02/57014) are excluded from the commit classifier. + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.SerializationFailure})) + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.DeadlockDetected})) + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.AdminShutdown})) + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.CrashShutdown})) + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.CannotConnectNow})) + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.TooManyConnections})) + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.QueryCanceled})) + + // Retried: typed network errors (non-PgError -> retry under opt-out). + require.True(t, isRetryablePostgresPreCommitError(io.EOF)) + require.True(t, isRetryablePostgresPreCommitError(io.ErrUnexpectedEOF)) + require.True(t, isRetryablePostgresPreCommitError(&net.OpError{ + Op: "read", + Err: errors.New("connection reset by peer"), + })) + + // Retried: unknown PgError codes (not in the permanent blocklist) and + // arbitrary non-PgError errors — opt-out assumes retryable unless known + // permanent. + require.True(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.InternalError})) + require.True(t, isRetryablePostgresPreCommitError(errors.New("some application error"))) + + // NOT retried: permanent SQLSTATE classes (22xxx/23xxx/42xxx). + require.False(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.UniqueViolation})) + require.False(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.ForeignKeyViolation})) + require.False(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.StringDataRightTruncationDataException})) + require.False(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.SyntaxError})) + require.False(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.InsufficientPrivilege})) + require.False(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.UndefinedTable})) + require.False(t, isRetryablePostgresPreCommitError(&pgconn.PgError{Code: pgerrcode.UndefinedColumn})) + + // NOT retried: caller's context is done. + require.False(t, isRetryablePostgresPreCommitError(context.Canceled)) + require.False(t, isRetryablePostgresPreCommitError(context.DeadlineExceeded)) + require.False(t, isRetryablePostgresPreCommitError(nil)) +} + +func TestRetryMySQLNonIdempotentNotImplemented(t *testing.T) { + err := RetryMySQLNonIdempotent(context.Background(), nil, RetryNonIdempotentOptions{}, func(*sql.Tx) error { return nil }) + require.Error(t, err) + require.Contains(t, err.Error(), "not yet implemented") +} + +func TestRetrySpannerNonIdempotentNotImplemented(t *testing.T) { + err := RetrySpannerNonIdempotent(context.Background(), nil, RetryNonIdempotentOptions{}, func(*sql.Tx) error { return nil }) + require.Error(t, err) + require.Contains(t, err.Error(), "not yet implemented") +} + +func TestErrCommitPhase(t *testing.T) { + // ErrCommitPhase is a sentinel that wraps commit-phase errors so callers + // can detect them via errors.Is. The underlying error is preserved. + underlying := errors.New("connection severed during commit") + wrapped := fmt.Errorf("%w: %w", ErrCommitPhase, underlying) + + require.ErrorIs(t, wrapped, ErrCommitPhase, "caller can detect commit-phase failures") + require.ErrorIs(t, wrapped, underlying, "underlying error is preserved for inspection") + + // A bare (non-commit-phase) error is NOT ErrCommitPhase. + require.NotErrorIs(t, underlying, ErrCommitPhase) + require.NotErrorIs(t, io.EOF, ErrCommitPhase) +} + +func TestErrCommitted(t *testing.T) { + // ErrCommitted wraps a commit error verified to have committed on the server + // (the response was lost). Callers detect it via errors.Is and must NOT retry. + underlying := errors.New("connection severed after commit recorded") + wrapped := fmt.Errorf("%w: %w", ErrCommitted, underlying) + + require.ErrorIs(t, wrapped, ErrCommitted, "caller can detect verified-committed") + require.ErrorIs(t, wrapped, underlying, "underlying commit error preserved") + require.NotErrorIs(t, wrapped, ErrCommitPhase) // distinct from ambiguous ErrCommitPhase + require.NotErrorIs(t, io.EOF, ErrCommitted) +} diff --git a/database/spanner.go b/database/spanner.go index 220f7939..4f68d283 100644 --- a/database/spanner.go +++ b/database/spanner.go @@ -1,7 +1,9 @@ package database import ( + "context" "database/sql" + "errors" "fmt" "strings" @@ -41,3 +43,16 @@ func SpannerUniqueViolation(err error) bool { return spanner.ErrCode(err) == codes.AlreadyExists || strings.Contains(err.Error(), "AlreadyExists") } + +// RetrySpannerNonIdempotent is the Spanner equivalent of RetryPostgresNonIdempotent. +// TODO: consider delegating to spannerdriver.RunTransactionWithOptions for +// ABORTED replay, plus network retry. retryTx already handles the loop and +// driver.ErrBadConn; the missing piece is a Spanner-specific retryClassifier. +// See https://pkg.go.dev/github.com/googleapis/go-sql-spanner +func RetrySpannerNonIdempotent(ctx context.Context, db *sql.DB, opts RetryNonIdempotentOptions, fn func(*sql.Tx) error) error { + _ = ctx + _ = db + _ = opts + _ = fn + return errors.New("database: RetrySpannerNonIdempotent is not yet implemented; see TODO") +} diff --git a/go.mod b/go.mod index 9ead87e7..b209ab14 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/googleapis/gax-go/v2 v2.22.0 github.com/googleapis/go-sql-spanner v1.25.1 github.com/gorilla/mux v1.8.1 + github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.9.2 github.com/madflojo/testcerts v1.5.0 github.com/markbates/pkger v0.17.1 diff --git a/go.sum b/go.sum index a0194c29..1e6377a6 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,6 @@ cloud.google.com/go v0.123.0 h1:2NAUJwPR47q+E35uaJeYoNhuNEM9kM8SjgRgdeOJUSE= cloud.google.com/go v0.123.0/go.mod h1:xBoMV08QcqUGuPW65Qfm1o9Y4zKZBpGS+7bImXLTAZU= cloud.google.com/go/alloydb v1.26.0 h1:UTzyumJ8tEo0CqwzLkV4WMGnCxvvhw3BDy1nXfCt9KE= cloud.google.com/go/alloydb v1.26.0/go.mod h1:oqHGc/Xb5fWtH+wIDpu2wcPJX9oML/fGJuH/sp8ysyo= -cloud.google.com/go/alloydbconn v1.18.2 h1:zxxUnSU50d1sS/nswGBldpZsaxF3emirbO+xF+Rtgig= -cloud.google.com/go/alloydbconn v1.18.2/go.mod h1:0vfrUSwleLoK13ycnoaZ1A4yCfg3r7rig7Xm5vKlEtM= cloud.google.com/go/alloydbconn v1.18.3 h1:7A8QN5DtPkyGToqekWSm4+Ryrx2+xYWkvRYh0A0fAVg= cloud.google.com/go/alloydbconn v1.18.3/go.mod h1:cbwUQHP9eLp3Y66xZyQZrKmlcvqLnlb91qKc6kXV46Q= cloud.google.com/go/auth v0.20.0 h1:kXTssoVb4azsVDoUiF8KvxAqrsQcQtB53DcSgta74CA= @@ -160,6 +158,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 h1:5VipnvEpbqr2gA2VbM+nYVbkIF2 github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0/go.mod h1:Hyl3n6Twe1hvtd9XUXDec4pTvgMSEixRuQKPTMH2bNs= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -280,8 +280,6 @@ go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= -golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -294,8 +292,6 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= -golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w= golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -314,8 +310,6 @@ golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= -golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= @@ -339,8 +333,6 @@ google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348 h1:JjVGDZYWkJWZcxv google.golang.org/genproto v0.0.0-20260504160031-60b97b32f348/go.mod h1:95PqD4xM+AdOcBGsmgfaofXsiA37uXDtDufVbntT3TU= google.golang.org/genproto/googleapis/api v0.0.0-20260504160031-60b97b32f348 h1:U8orV30l6KpDsi9dxU0CoJZGbjS8EEpw+6ba+XwGPQA= google.golang.org/genproto/googleapis/api v0.0.0-20260504160031-60b97b32f348/go.mod h1:Yzdzr5OOZFgSsEV2D/Xi9NL3bszpXFAg0hFJiRohcD8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 h1:pfIbyB44sWzHiCpRqIen67ZQnVXSfIxWrqUMk1qwODE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60 h1:seT2EwLWM78plQ7wcDfuWBc/4FAEAXDDiaSol4ku4qo= google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -348,8 +340,6 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.81.0 h1:W3G9N3KQf3BU+YuCtGKJk0CmxQNbAISICD/9AORxLIw= -google.golang.org/grpc v1.81.0/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=