Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

gomysql "github.com/go-sql-driver/mysql"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/moov-io/base/database"
"github.com/moov-io/base/log"
Expand All @@ -38,7 +39,7 @@ func TestUniqueViolation(t *testing.T) {
t.Error("should have matched postgres unique violation")
}
pgconnErr := &pgconn.PgError{
Code: "23505",
Code: pgerrcode.UniqueViolation,
}
if !database.UniqueViolation(pgconnErr) {
t.Error("should have matched PgError unique violation")
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestDeadlockFound(t *testing.T) {
t.Error("should have matched postgres deadlock found")
}
pgconnErr := &pgconn.PgError{
Code: "40P01",
Code: pgerrcode.DeadlockDetected,
}
if !database.DeadlockFound(pgconnErr) {
t.Error("should have matched PgError deadlock found")
Expand Down
13 changes: 13 additions & 0 deletions database/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,16 @@ func MySQLDeadlockFound(err error) bool {

return strings.Contains(err.Error(), fmt.Sprintf("Error %d", mysqlErrDeadlockFound))
}

// RetryMySQLNonIdempotent is the MySQL equivalent of RetryPostgresNonIdempotent.
// TODO: implement a MySQL-specific retryClassifier (retryTx already handles the
// Begin/fn/Commit/Rollback loop and driver.ErrBadConn). Cover 1213 deadlock,
// 1205 lock wait timeout, 2013 connection lost. See
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
func RetryMySQLNonIdempotent(ctx context.Context, db *sql.DB, opts RetryNonIdempotentOptions, fn func(*sql.Tx) error) error {
_ = ctx
_ = db
_ = opts
_ = fn
return errors.New("database: RetryMySQLNonIdempotent is not yet implemented; see TODO")
}
221 changes: 111 additions & 110 deletions database/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,29 @@ import (
"database/sql"
"errors"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"

"cloud.google.com/go/alloydbconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/moov-io/base/log"
)

const (
// PostgreSQL Error Codes
// https://www.postgresql.org/docs/current/errcodes-appendix.html
postgresErrUniqueViolation = "23505"
postgresErrDeadlockFound = "40P01"
)

func postgresConnection(ctx context.Context, logger log.Logger, config PostgresConfig, databaseName string) (*sql.DB, error) {
poolConfig, err := buildPgxPoolConfig(ctx, config, databaseName)
if err != nil {
return nil, logger.LogErrorf("building pgx pool config: %w", err).Err()
}

// HealthCheckPeriod makes pgxpool ping idle connections in the background.
// Dead connections (e.g. from an AlloyDB switchover) are evicted before
// the application ever sees them.
// HealthCheckPeriod is how often the background goroutine evicts connections
// that exceeded MaxConnLifetime or MaxConnIdleTime. It does NOT ping for
// liveness — dead connections are caught at acquire time by the ResetSession
// ping (default: ping if idle > 1s), with database/sql retrying on a fresh
// conn and RetryPostgresNonIdempotent retrying beyond that.
poolConfig.HealthCheckPeriod = 1 * time.Second

pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
Expand Down Expand Up @@ -154,11 +148,11 @@ func PostgresUniqueViolation(err error) bool {
}

var pgError *pgconn.PgError
if errors.As(err, &pgError) && pgError.Code == postgresErrUniqueViolation {
if errors.As(err, &pgError) && pgError.Code == pgerrcode.UniqueViolation {
return true
}

return strings.Contains(err.Error(), postgresErrUniqueViolation)
return strings.Contains(err.Error(), pgerrcode.UniqueViolation)
}

// PostgresDeadlockFound returns true when the provided error matches the Postgres code
Expand All @@ -169,119 +163,126 @@ func PostgresDeadlockFound(err error) bool {
}

var pgError *pgconn.PgError
if errors.As(err, &pgError) && pgError.Code == postgresErrDeadlockFound {
if errors.As(err, &pgError) && pgError.Code == pgerrcode.DeadlockDetected {
return true
}

return strings.Contains(err.Error(), postgresErrDeadlockFound)
return strings.Contains(err.Error(), pgerrcode.DeadlockDetected)
}

// isPermanentPostgresError reports SQLSTATE classes that would fail again on
// retry, used by isRetryablePostgresPreCommitError to opt out. Only
// clearly-permanent classes are listed so unknown/transient codes are retried.
func isPermanentPostgresError(pgErr *pgconn.PgError) bool {
return pgerrcode.IsDataException(pgErr.Code) || // class 22
pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) || // class 23
pgerrcode.IsSyntaxErrororAccessRuleViolation(pgErr.Code) // class 42
}

// IsRetryablePostgresError returns true if the error is a transient connection-level
// error that is safe to retry. This covers the errors seen during AlloyDB maintenance
// switchovers and other transient network failures.
func IsRetryablePostgresError(err error) bool {
// retryJitterMax is the upper bound on the random delay between retries. Full
// jitter in [0, retryJitterMax) spreads concurrent retries across the fleet
// rather than slamming the database all at once. AlloyDB maintenance
// switchovers cause <1s downtime, so maxRetryAttempts attempts with up to
// retryJitterMax between each spans the blip; the caller's context deadline is
// the escape hatch for tighter latency budgets.
const retryJitterMax = 100 * time.Millisecond

// isRetryablePostgresPreCommitError is the pre-commit classifier for
// RetryPostgresNonIdempotent (errors from BeginTx or fn). OPTS OUT: pre-commit retry is
// always safe (the transaction rolls back), so it retries everything EXCEPT
// context.Canceled/DeadlineExceeded and the permanent SQLSTATE classes
// (isPermanentPostgresError). Unknown *pgconn.PgError codes and non-PgError
// errors (network, driver.ErrBadConn, SafeToRetry) are retried — a false
// positive (~200ms wasted on a permanent error) costs less than a false
// negative (spurious user-facing failure), so opt-out is safer than opt-in.
//
// opts.IsRetryable is additive (OR); to narrow the set, wrap with a predicate
// that re-checks.
func isRetryablePostgresPreCommitError(err error) bool {
if err == nil {
return false
}

// PostgreSQL error codes that are safe to retry because the server guarantees
// the transaction was rolled back before the error was returned.
//
// 57P01 admin_shutdown, 57P02 crash_shutdown: fast shutdown rolls back all
// active transactions before disconnecting clients. See:
// https://www.postgresql.org/docs/current/server-shutdown.html
//
// 57P03 cannot_connect_now: server is still starting up; the operation never
// reached a transaction.
//
// 40001 serialization_failure, 40P01 deadlock_detected: the server explicitly
// rolled back the transaction and expects the client to retry. See:
// https://www.postgresql.org/docs/current/mvcc-serialization-failure-handling.html
//
// 53300 too_many_connections: rejected at connect time; no transaction started.
//
// 57014 query_canceled: the server rolled back the transaction before returning
// this error.
//
// Note: 08xxx (connection_exception class) codes are intentionally omitted.
// pgx surfaces connection-level failures as TCP/network errors, not as
// *pgconn.PgError, so those cases are handled below.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case "57P01", "57P02", "57P03", // admin_shutdown, crash_shutdown, cannot_connect_now
"40001", "40P01", // serialization_failure, deadlock_detected
"53300", // too_many_connections
"57014": // query_canceled
return true
}
return false
return !isPermanentPostgresError(pgErr)
}
return true
}

// Network-level errors: connection reset, broken pipe, EOF, etc.
// These occur when the TCP connection is severed during a switchover.
var netErr *net.OpError
if errors.As(err, &netErr) {
return true
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
// capturePostgresXid returns the current transaction's id (pg_current_xact_id_if_assigned)
// as a string, or nil if no id is assigned (read-only transaction). Captured
// before Commit so verifyPostgresCommit can check whether an ambiguous commit
// landed. The ::text cast sidesteps pgx's xid8 Go type-mapping; the string is
// passed back to pg_xact_status($1::xid8) on verification. Returns nil for
// read-only transactions (no xid), in which case retryTx retries an ambiguous
// commit without verifying (no writes to duplicate).
func capturePostgresXid(tx *sql.Tx) (any, error) {
var s sql.NullString
if err := tx.QueryRow("SELECT pg_current_xact_id_if_assigned()::text").Scan(&s); err != nil {
return nil, err
}
if errors.Is(err, context.DeadlineExceeded) {
return false // don't retry if the caller's context timed out
if !s.Valid {
return nil, nil // read-only, no xid assigned
}
return s.String, nil
}

// pgx sometimes wraps TCP-level disconnection errors as plain strings rather
// than preserving the original net.OpError type. These are safe to retry
// because they occur on the write path — the server never received the query.
// "conn closed" is pgx's own sentinel for a connection already closed before use.
// Note: "connection refused" and "unexpected EOF" are intentionally excluded —
// the former does not appear in pgx's codebase, the latter is already caught
// above by errors.Is(err, io.ErrUnexpectedEOF).
msg := err.Error()
if strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "broken pipe") ||
strings.Contains(msg, "conn closed") {
return true
// verifyPostgresCommit checks whether the transaction with the given xid
// committed, aborted, or is indeterminate, via pg_xact_status on a fresh
// connection (the original may be dead). xid is the string from
// capturePostgresXid. Returns:
// - commitStatusAborted: the commit did NOT land → safe to retry.
// - commitStatusCommitted: the commit DID land → don't retry (ErrCommitted).
// - commitStatusUnknown: pg_xact_status returned NULL (xid too old, or not
// yet visible after a failover) or errored, or status was "in progress" →
// ambiguous (ErrCommitPhase).
func verifyPostgresCommit(ctx context.Context, db retryDB, xid any) (commitStatus, error) {
s, ok := xid.(string)
if !ok || s == "" {
return commitStatusUnknown, nil
}
var status sql.NullString
if err := db.QueryRowContext(ctx, "SELECT pg_xact_status($1::xid8)", s).Scan(&status); err != nil {
return commitStatusUnknown, err
}
switch status.String {
case "aborted":
return commitStatusAborted, nil
case "committed":
return commitStatusCommitted, nil
default: // "" (NULL), "in progress", or unexpected → ambiguous
return commitStatusUnknown, nil
}

return false
}

// retryJitterMax is the upper bound for the random delay between retries.
// Full jitter in [0, retryJitterMax) spreads concurrent retries across the
// fleet rather than letting them all slam the database at the same instant.
// AlloyDB planned maintenance switchovers typically cause less than one second
// of downtime, so three attempts with up to 100ms between each gives a worst-
// case retry window of ~200ms — well within typical service SLAs while still
// spanning the switchover blip. The caller's context deadline is the escape
// hatch for services with tighter latency budgets.
const retryJitterMax = 100 * time.Millisecond
// RetryPostgresNonIdempotent runs fn in a Postgres transaction, retrying the
// whole transaction on transient errors. fn gets a fresh *sql.Tx each attempt;
// if it returns an error the tx is rolled back, if nil the tx is committed.
//
// Pre-commit errors (from BeginTx or fn) use the opt-out
// isRetryablePostgresPreCommitError (opts.IsRetryable is additive on it).
// Commit errors are always verified: before each Commit the transaction id is
// captured (pg_current_xact_id_if_assigned), and on a commit error
// pg_xact_status is queried on a fresh connection — aborted → retry,
// committed → ErrCommitted (don't retry), inconclusive → ErrCommitPhase.
// Read-only transactions (no xid) retry without verification. No client-side
// "retryable commit" guess — the server is the authority on whether the commit
// landed.
//
// db must be pgx-backed (e.g. from database.New with a PostgresConfig); other
// drivers won't get pgx-specific retries or commit verification.
func RetryPostgresNonIdempotent(ctx context.Context, db *sql.DB, opts RetryNonIdempotentOptions, fn func(*sql.Tx) error) error {
return retryTx(ctx, db, postgresTxClassifier, opts, fn)
}

// RetryPostgres executes fn up to maxAttempts times, retrying on transient
// connection errors. This is intended for use around individual database
// operations to survive brief outages like AlloyDB maintenance switchovers.
func RetryPostgres(ctx context.Context, maxAttempts int, fn func() error) error {
if maxAttempts <= 0 {
maxAttempts = 3
}
var err error
for attempt := 0; attempt < maxAttempts; attempt++ {
err = fn()
if err == nil {
return nil
}
if !IsRetryablePostgresError(err) {
return err
}
if attempt < maxAttempts-1 {
delay := time.Duration(rand.Int63n(int64(retryJitterMax)))
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}
}
return err
// postgresTxClassifier pairs the pre-commit (opt-out) classifier with commit
// verification (capturePostgresXid/verifyPostgresCommit). Commit errors are
// always verified, not classified. See ErrCommitPhase / ErrCommitted.
var postgresTxClassifier = retryClassifier{
preCommit: isRetryablePostgresPreCommitError,
captureXid: capturePostgresXid,
verifyCommit: verifyPostgresCommit,
}
Loading