diff --git a/src/backend/distributed/operations/backup_block.c b/src/backend/distributed/operations/backup_block.c new file mode 100644 index 00000000000..05c52db483e --- /dev/null +++ b/src/backend/distributed/operations/backup_block.c @@ -0,0 +1,990 @@ +/*------------------------------------------------------------------------- + * + * backup_block.c + * + * Implementation of UDFs for blocking distributed writes during LTR backup. + * + * This file provides three SQL-callable functions: + * + * citus_block_writes_for_backup(timeout_ms int DEFAULT 300000) + * -> Spawns a background worker that acquires ExclusiveLock on + * pg_dist_transaction, pg_dist_partition, and pg_dist_node on + * the coordinator and all worker nodes. Returns true when + * locks are held. + * + * citus_unblock_writes_for_backup() + * -> Signals the background worker to release all locks and exit. + * Returns true on success. + * + * citus_backup_block_status() + * -> Returns a single-row result describing the current block state: + * (state text, worker_pid int, requestor_pid int, + * block_start_time timestamptz, timeout_ms int, node_count int) + * + * Architecture: + * The actual lock-holding is done by a dedicated background worker + * (CitusBackupBlockWorkerMain). This ensures locks survive the + * caller's session disconnect. The background worker communicates + * its state through shared memory (BackupBlockControlData). + * + * The worker auto-releases locks when: + * - citus_unblock_writes_for_backup() sets releaseRequested + * - The configured timeout expires + * - A worker-node connection fails + * - The postmaster dies + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/xact.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lmgr.h" +#include "storage/lock.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "utils/builtins.h" +#include "utils/timestamp.h" + +#include "distributed/backup_block.h" +#include "distributed/background_worker_utils.h" +#include "distributed/citus_safe_lib.h" +#include "distributed/connection_management.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" +#include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" +#include "distributed/version_compat.h" +#include "distributed/worker_manager.h" + + +/* SQL-callable function declarations */ +PG_FUNCTION_INFO_V1(citus_block_writes_for_backup); +PG_FUNCTION_INFO_V1(citus_unblock_writes_for_backup); +PG_FUNCTION_INFO_V1(citus_backup_block_status); + + +/* default and maximum timeout for backup block (5 minutes / 30 minutes) */ +#define BACKUP_BLOCK_DEFAULT_TIMEOUT_MS 300000 +#define BACKUP_BLOCK_MAX_TIMEOUT_MS 1800000 + +/* polling interval while waiting for worker to acquire locks */ +#define BACKUP_BLOCK_POLL_INTERVAL_MS 100 + +/* interval for worker to check latch / release conditions */ +#define BACKUP_BLOCK_WORKER_CHECK_MS 1000 + +/* maximum iterations to wait for unblock completion: 300 * 100ms = 30s */ +#define BACKUP_BLOCK_UNBLOCK_MAX_LOOPS 300 + + +/* shared memory pointer */ +static BackupBlockControlData *BackupBlockControl = NULL; + +/* previous shmem_startup_hook */ +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +/* SIGTERM flag for worker */ +static volatile sig_atomic_t got_sigterm = false; + + +/* forward declarations */ +static List * OpenConnectionsToMetadataWorkersForBlock(LOCKMODE lockMode); +static void BlockDistributedTransactionsLocally(void); +static void BlockDistributedTransactionsOnWorkers(List *connectionList, + int timeoutMs, + int *nodeCount); +static void backup_block_worker_sigterm(SIGNAL_ARGS); +static void SetBackupBlockState(BackupBlockState newState); +static void SetBackupBlockError(const char *message); + + +/* ---------------------------------------------------------------- + * Shared Memory + * ---------------------------------------------------------------- */ + +/* + * BackupBlockShmemSize returns the amount of shared memory needed for + * the backup block control structure. + */ +size_t +BackupBlockShmemSize(void) +{ + return sizeof(BackupBlockControlData); +} + + +/* + * BackupBlockShmemInit initializes the shared memory for backup block. + * Called from citus_shmem_startup_hook or similar. + */ +void +BackupBlockShmemInit(void) +{ + bool alreadyInitialized = false; + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + BackupBlockControl = + (BackupBlockControlData *) ShmemInitStruct("Citus Backup Block", + BackupBlockShmemSize(), + &alreadyInitialized); + + if (!alreadyInitialized) + { + BackupBlockControl->trancheId = LWLockNewTrancheId(); + strlcpy(BackupBlockControl->lockTrancheName, "Citus Backup Block", + NAMEDATALEN); + LWLockRegisterTranche(BackupBlockControl->trancheId, + BackupBlockControl->lockTrancheName); + LWLockInitialize(&BackupBlockControl->lock, + BackupBlockControl->trancheId); + + BackupBlockControl->state = BACKUP_BLOCK_INACTIVE; + BackupBlockControl->workerPid = 0; + BackupBlockControl->requestorPid = 0; + BackupBlockControl->blockStartTime = 0; + BackupBlockControl->timeoutMs = 0; + BackupBlockControl->nodeCount = 0; + BackupBlockControl->errorMessage[0] = '\0'; + BackupBlockControl->releaseRequested = false; + } + + LWLockRelease(AddinShmemInitLock); +} + + +/* + * InitializeBackupBlock installs the shmem_startup_hook to initialize + * backup block shared memory. Called from _PG_init. + */ +void +InitializeBackupBlock(void) +{ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = BackupBlockShmemInit; +} + + +/* ---------------------------------------------------------------- + * Helper: State management (caller must NOT hold LWLock) + * ---------------------------------------------------------------- */ + +static void +SetBackupBlockState(BackupBlockState newState) +{ + LWLockAcquire(&BackupBlockControl->lock, LW_EXCLUSIVE); + BackupBlockControl->state = newState; + LWLockRelease(&BackupBlockControl->lock); +} + + +static void +SetBackupBlockError(const char *message) +{ + LWLockAcquire(&BackupBlockControl->lock, LW_EXCLUSIVE); + BackupBlockControl->state = BACKUP_BLOCK_ERROR; + strlcpy(BackupBlockControl->errorMessage, message, + sizeof(BackupBlockControl->errorMessage)); + LWLockRelease(&BackupBlockControl->lock); +} + + +/* ---------------------------------------------------------------- + * Background Worker: CitusBackupBlockWorkerMain + * + * Lifecycle: + * 1. Connect to database + * 2. Open connections to metadata worker nodes + * 3. Acquire local ExclusiveLocks (coordinator) + * 4. Send LOCK TABLE commands to all metadata workers + * 5. Update shared memory -> BACKUP_BLOCK_ACTIVE + * 6. Wait loop: check latch for release / timeout / postmaster death + * 7. Close remote connections (rolls back remote transactions, + * releasing remote locks) + * 8. Exit (local locks released when transaction ends) + * ---------------------------------------------------------------- */ + +/* + * Signal handler for SIGTERM in the background worker. + */ +static void +backup_block_worker_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + SetLatch(MyLatch); + + errno = save_errno; +} + + +/* + * CitusBackupBlockWorkerMain is the entry point for the backup block + * background worker. It acquires locks on the coordinator and all + * worker nodes, then holds them until told to release. + */ +void +CitusBackupBlockWorkerMain(Datum main_arg) +{ + Oid databaseOid = DatumGetObjectId(main_arg); + + /* + * Extract timeout from bgw_extra. + * Layout: [Oid extensionOwner][int timeoutMs] + * Always populated by RegisterCitusBackgroundWorker. + */ + int timeoutMs = BACKUP_BLOCK_DEFAULT_TIMEOUT_MS; + memcpy(&timeoutMs, MyBgworkerEntry->bgw_extra + sizeof(Oid), sizeof(int)); + + pqsignal(SIGTERM, backup_block_worker_sigterm); + BackgroundWorkerUnblockSignals(); + + /* connect to database */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, InvalidOid, 0); + + elog(LOG, "backup block worker started (timeout=%dms)", timeoutMs); + + /* start a transaction — locks live until this transaction ends */ + StartTransactionCommand(); + + List *connectionList = NIL; + + PG_TRY(); + { + /* open connections to metadata worker nodes */ + connectionList = OpenConnectionsToMetadataWorkersForBlock(ShareLock); + + /* begin remote transactions (to bust through pgbouncer) */ + RemoteTransactionListBegin(connectionList); + + /* acquire local locks on coordinator */ + BlockDistributedTransactionsLocally(); + + /* acquire remote locks on all metadata workers */ + int nodeCount = 0; + BlockDistributedTransactionsOnWorkers(connectionList, timeoutMs, + &nodeCount); + + /* update shared memory: locks acquired */ + LWLockAcquire(&BackupBlockControl->lock, LW_EXCLUSIVE); + BackupBlockControl->state = BACKUP_BLOCK_ACTIVE; + BackupBlockControl->blockStartTime = GetCurrentTimestamp(); + BackupBlockControl->nodeCount = nodeCount; + BackupBlockControl->workerPid = MyProcPid; + LWLockRelease(&BackupBlockControl->lock); + + elog(LOG, "backup block active: locks held on coordinator + %d worker nodes", + nodeCount); + + /* + * Build a WaitEventSet that monitors: + * - Each remote connection socket (detect dead workers immediately) + * - Latch (for release request / SIGTERM wakeup) + * - Postmaster death + * + * Slot count: one per connection + latch + postmaster death. + */ + int eventSetSize = list_length(connectionList) + 2; + WaitEventSet *waitEventSet = + CreateWaitEventSet(WaitEventSetTracker_compat, eventSetSize); + + MultiConnection *conn = NULL; + foreach_declared_ptr(conn, connectionList) + { + int sock = PQsocket(conn->pgConn); + if (sock == -1) + { + FreeWaitEventSet(waitEventSet); + ereport(ERROR, + (errmsg("backup block: remote connection already " + "closed before entering wait loop"))); + } + + int idx = CitusAddWaitEventSetToSet(waitEventSet, + WL_SOCKET_READABLE, + sock, NULL, + (void *) conn); + if (idx == WAIT_EVENT_SET_INDEX_FAILED) + { + FreeWaitEventSet(waitEventSet); + ereport(ERROR, + (errmsg("backup block: could not add remote socket " + "to wait event set"))); + } + } + AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, + PGINVALID_SOCKET, NULL, NULL); + AddWaitEventToSet(waitEventSet, WL_LATCH_SET, + PGINVALID_SOCKET, MyLatch, NULL); + + /* + * Hold locks until release is requested, timeout expires, + * a remote connection fails, or we receive SIGTERM / postmaster death. + */ + TimestampTz startTime = GetCurrentTimestamp(); + WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent)); + + while (!got_sigterm) + { + int eventCount = WaitEventSetWait(waitEventSet, + BACKUP_BLOCK_WORKER_CHECK_MS, + events, eventSetSize, + PG_WAIT_EXTENSION); + + for (int i = 0; i < eventCount; i++) + { + WaitEvent *event = &events[i]; + + if (event->events & WL_POSTMASTER_DEATH) + { + FreeWaitEventSet(waitEventSet); + pfree(events); + SetBackupBlockError( + "postmaster died while holding backup block"); + proc_exit(1); + } + + if (event->events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + continue; + } + + if (event->events & WL_SOCKET_READABLE) + { + /* + * An idle locked connection should never receive data. + * Any readability means the remote end sent something + * unexpected (e.g. termination notice) or the socket + * is broken. + */ + MultiConnection *failedConn = + (MultiConnection *) event->user_data; + bool connectionBad = false; + + if (PQconsumeInput(failedConn->pgConn) == 0) + { + connectionBad = true; + } + else if (PQstatus(failedConn->pgConn) == CONNECTION_BAD) + { + connectionBad = true; + } + + if (connectionBad) + { + FreeWaitEventSet(waitEventSet); + pfree(events); + ereport(ERROR, + (errmsg("backup block: lost connection to " + "worker node %s:%d", + failedConn->hostname, + failedConn->port))); + } + } + } + + CHECK_FOR_INTERRUPTS(); + + /* check if release was requested */ + bool shouldRelease = false; + LWLockAcquire(&BackupBlockControl->lock, LW_SHARED); + shouldRelease = BackupBlockControl->releaseRequested; + LWLockRelease(&BackupBlockControl->lock); + + if (shouldRelease) + { + SetBackupBlockState(BACKUP_BLOCK_RELEASING); + elog(LOG, "backup block: release requested, shutting down"); + break; + } + + /* check timeout */ + TimestampTz now = GetCurrentTimestamp(); + long elapsedMs = (long) ((now - startTime) / 1000); + if (timeoutMs > 0 && elapsedMs >= timeoutMs) + { + elog(WARNING, "backup block: timeout reached (%d ms), auto-releasing", + timeoutMs); + break; + } + } + + FreeWaitEventSet(waitEventSet); + pfree(events); + + /* clean up: close remote connections (rolls back remote transactions) */ + foreach_declared_ptr(conn, connectionList) + { + ForgetResults(conn); + CloseConnection(conn); + } + connectionList = NIL; + } + PG_CATCH(); + { + /* on error, clean up connections and report */ + ErrorData *edata = CopyErrorData(); + FlushErrorState(); + + MultiConnection *conn = NULL; + foreach_declared_ptr(conn, connectionList) + { + ForgetResults(conn); + CloseConnection(conn); + } + + /* + * SIGTERM-induced errors are controlled shutdowns (e.g. postmaster + * restart), not real failures — report INACTIVE instead of ERROR + * so the state is clean for the next startup. + */ + if (got_sigterm) + { + SetBackupBlockState(BACKUP_BLOCK_INACTIVE); + } + else + { + SetBackupBlockError(edata->message); + } + + FreeErrorData(edata); + + AbortCurrentTransaction(); + + elog(LOG, "backup block worker exiting due to %s", + got_sigterm ? "SIGTERM" : "error"); + proc_exit(got_sigterm ? 0 : 1); + } + PG_END_TRY(); + + /* abort transaction to release local locks */ + AbortCurrentTransaction(); + + /* mark shared memory as inactive */ + LWLockAcquire(&BackupBlockControl->lock, LW_EXCLUSIVE); + BackupBlockControl->state = BACKUP_BLOCK_INACTIVE; + BackupBlockControl->workerPid = 0; + BackupBlockControl->requestorPid = 0; + BackupBlockControl->releaseRequested = false; + BackupBlockControl->nodeCount = 0; + BackupBlockControl->errorMessage[0] = '\0'; + BackupBlockControl->blockStartTime = 0; + BackupBlockControl->timeoutMs = 0; + LWLockRelease(&BackupBlockControl->lock); + + elog(LOG, "backup block worker finished, all locks released"); + proc_exit(0); +} + + +/* ---------------------------------------------------------------- + * Lock acquisition helpers + * ---------------------------------------------------------------- */ + +/* + * OpenConnectionsToMetadataWorkersForBlock opens connections to all + * remote metadata worker nodes using FORCE_NEW_CONNECTION. + * + * Unlike citus_create_restore_point (which needs all nodes for + * pg_create_restore_point), backup block only needs metadata nodes + * for the LOCK TABLE commands. + */ +static List * +OpenConnectionsToMetadataWorkersForBlock(LOCKMODE lockMode) +{ + List *connectionList = NIL; + int connectionFlags = FORCE_NEW_CONNECTION; + + List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); + + WorkerNode *workerNode = NULL; + foreach_declared_ptr(workerNode, workerNodeList) + { + if (!NodeIsPrimaryAndRemote(workerNode) || !workerNode->hasMetadata) + { + continue; + } + + MultiConnection *connection = StartNodeConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort); + MarkRemoteTransactionCritical(connection); + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + return connectionList; +} + + +/* + * BlockDistributedTransactionsLocally acquires ExclusiveLock on + * pg_dist_node, pg_dist_partition, and pg_dist_transaction on the + * coordinator. Same pattern as citus_create_restore_point. + */ +static void +BlockDistributedTransactionsLocally(void) +{ + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + LockRelationOid(DistPartitionRelationId(), ExclusiveLock); + LockRelationOid(DistTransactionRelationId(), ExclusiveLock); +} + + +/* + * BlockDistributedTransactionsOnWorkers sends LOCK TABLE commands to + * all connections (which are already filtered to metadata workers) + * to acquire ExclusiveLocks remotely. + * + * A statement_timeout is set on each connection so that lock acquisition + * does not hang indefinitely if a remote node is unresponsive. + * + * Sets *nodeCount to the number of nodes locked. + */ +static void +BlockDistributedTransactionsOnWorkers(List *connectionList, int timeoutMs, + int *nodeCount) +{ + /* set statement_timeout on each remote connection to bound lock wait */ + char timeoutCommand[128]; + SafeSnprintf(timeoutCommand, sizeof(timeoutCommand), + "SET LOCAL statement_timeout = %d", timeoutMs); + + MultiConnection *connection = NULL; + foreach_declared_ptr(connection, connectionList) + { + int querySent = SendRemoteCommand(connection, timeoutCommand); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + foreach_declared_ptr(connection, connectionList) + { + PGresult *result = GetRemoteCommandResult(connection, true); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + PQclear(result); + ForgetResults(connection); + } + + /* send lock commands in parallel */ + foreach_declared_ptr(connection, connectionList) + { + int querySent = SendRemoteCommand(connection, + BLOCK_DISTRIBUTED_WRITES_COMMAND); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + /* wait for all lock acquisitions */ + foreach_declared_ptr(connection, connectionList) + { + PGresult *result = GetRemoteCommandResult(connection, true); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + PQclear(result); + ForgetResults(connection); + } + + *nodeCount = list_length(connectionList); +} + + +/* ---------------------------------------------------------------- + * SQL-callable UDFs + * ---------------------------------------------------------------- */ + +/* + * citus_block_writes_for_backup blocks distributed 2PC writes across + * the entire Citus cluster. A background worker is spawned to hold + * the locks, so they survive if this session disconnects. + * + * Parameters: + * timeout_ms (int, default 300000) - auto-release after this many ms + * + * Returns: true when locks are held on all nodes. + */ +Datum +citus_block_writes_for_backup(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + int timeoutMs = PG_GETARG_INT32(0); + + if (timeoutMs <= 0 || timeoutMs > BACKUP_BLOCK_MAX_TIMEOUT_MS) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("timeout_ms must be between 1 and %d", + BACKUP_BLOCK_MAX_TIMEOUT_MS))); + } + + /* + * Atomically check-and-set under a single LW_EXCLUSIVE acquisition + * to prevent TOCTOU races between concurrent callers. + */ + LWLockAcquire(&BackupBlockControl->lock, LW_EXCLUSIVE); + BackupBlockState currentState = BackupBlockControl->state; + + if (currentState == BACKUP_BLOCK_ACTIVE || + currentState == BACKUP_BLOCK_STARTING) + { + LWLockRelease(&BackupBlockControl->lock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("a backup block is already active"), + errhint("Use citus_unblock_writes_for_backup() to release " + "the existing block first."))); + } + + BackupBlockControl->state = BACKUP_BLOCK_STARTING; + BackupBlockControl->requestorPid = MyProcPid; + BackupBlockControl->workerPid = 0; + BackupBlockControl->releaseRequested = false; + BackupBlockControl->errorMessage[0] = '\0'; + BackupBlockControl->nodeCount = 0; + BackupBlockControl->timeoutMs = timeoutMs; + LWLockRelease(&BackupBlockControl->lock); + + /* spawn the background worker */ + char workerName[BGW_MAXLEN]; + SafeSnprintf(workerName, BGW_MAXLEN, + "Citus Backup Block Worker: %u", MyDatabaseId); + + CitusBackgroundWorkerConfig config = { + .workerName = workerName, + .functionName = "CitusBackupBlockWorkerMain", + .mainArg = ObjectIdGetDatum(MyDatabaseId), + .extensionOwner = CitusExtensionOwner(), + .needsNotification = true, + .waitForStartup = true, + .restartTime = CITUS_BGW_NEVER_RESTART, + .startTime = BgWorkerStart_RecoveryFinished, + .workerType = "citus_backup_block", + .extraData = &timeoutMs, + .extraDataSize = sizeof(int) + }; + + BackgroundWorkerHandle *handle = RegisterCitusBackgroundWorker(&config); + if (!handle) + { + SetBackupBlockState(BACKUP_BLOCK_INACTIVE); + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start backup block background worker"), + errhint("Check that max_worker_processes is high enough."))); + } + + /* + * Poll shared memory until the worker reports ACTIVE or ERROR. + * The worker was started with waitForStartup=true, so it's + * already running at this point. + */ + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(&BackupBlockControl->lock, LW_SHARED); + BackupBlockState state = BackupBlockControl->state; + char errMsg[256]; + if (state == BACKUP_BLOCK_ERROR) + { + strlcpy(errMsg, BackupBlockControl->errorMessage, sizeof(errMsg)); + } + LWLockRelease(&BackupBlockControl->lock); + + if (state == BACKUP_BLOCK_ACTIVE) + { + break; + } + + if (state == BACKUP_BLOCK_ERROR) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("backup block worker failed: %s", errMsg))); + } + + if (state == BACKUP_BLOCK_INACTIVE) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("backup block worker exited unexpectedly"))); + } + + /* still STARTING, check if worker is still alive */ + pid_t bgwPid; + BgwHandleStatus bgwStatus = GetBackgroundWorkerPid(handle, &bgwPid); + if (bgwStatus == BGWH_STOPPED) + { + SetBackupBlockState(BACKUP_BLOCK_INACTIVE); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("backup block worker exited unexpectedly " + "(crashed or killed)"))); + } + + /* still STARTING, wait a bit */ + int rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + BACKUP_BLOCK_POLL_INTERVAL_MS, + PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + if (rc & WL_POSTMASTER_DEATH) + { + proc_exit(1); + } + } + + PG_RETURN_BOOL(true); +} + + +/* + * citus_unblock_writes_for_backup signals the background worker to + * release all locks and exit. Can be called from any session. + * + * Returns: true if the block was released, false if no block was active. + */ +Datum +citus_unblock_writes_for_backup(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + LWLockAcquire(&BackupBlockControl->lock, LW_EXCLUSIVE); + BackupBlockState currentState = BackupBlockControl->state; + + if (currentState != BACKUP_BLOCK_ACTIVE && + currentState != BACKUP_BLOCK_STARTING) + { + LWLockRelease(&BackupBlockControl->lock); + PG_RETURN_BOOL(false); + } + + BackupBlockControl->releaseRequested = true; + + /* wake the worker via its latch if it has a valid PID */ + pid_t workerPid = BackupBlockControl->workerPid; + LWLockRelease(&BackupBlockControl->lock); + + if (workerPid > 0) + { + /* + * Send SIGUSR1 to wake the worker's latch so it checks + * releaseRequested and exits cleanly via the release path. + * This is gentler than SIGTERM and allows the worker to + * transition through BACKUP_BLOCK_RELEASING state. + */ + kill(workerPid, SIGUSR1); + } + + /* + * Wait for the worker to finish releasing. + * Poll shared memory with a short interval. + */ + for (int i = 0; i < BACKUP_BLOCK_UNBLOCK_MAX_LOOPS; i++) + { + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(&BackupBlockControl->lock, LW_SHARED); + BackupBlockState state = BackupBlockControl->state; + LWLockRelease(&BackupBlockControl->lock); + + if (state == BACKUP_BLOCK_INACTIVE) + { + PG_RETURN_BOOL(true); + } + + int rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + BACKUP_BLOCK_POLL_INTERVAL_MS, + PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + if (rc & WL_POSTMASTER_DEATH) + { + proc_exit(1); + } + } + + ereport(WARNING, + (errmsg("backup block worker did not shut down within 30 seconds"))); + + PG_RETURN_BOOL(false); +} + + +/* + * citus_backup_block_status returns a single row describing the current + * state of the backup write block. + * + * Returns: (state text, worker_pid int, requestor_pid int, + * block_start_time timestamptz, timeout_ms int, node_count int) + */ +Datum +citus_backup_block_status(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + TupleDesc tupleDesc; + + if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + tupleDesc = BlessTupleDesc(tupleDesc); + + Datum values[6]; + bool nulls[6]; + memset(nulls, 0, sizeof(nulls)); + + LWLockAcquire(&BackupBlockControl->lock, LW_SHARED); + + BackupBlockState currentState = BackupBlockControl->state; + pid_t workerPid = BackupBlockControl->workerPid; + + LWLockRelease(&BackupBlockControl->lock); + + /* + * Detect stale state: if the worker is supposed to be alive but its + * process no longer exists (SIGKILL, OOM-killer, etc.), auto-clean + * the shared memory so the system doesn't get permanently stuck. + */ + if ((currentState == BACKUP_BLOCK_ACTIVE || + currentState == BACKUP_BLOCK_STARTING || + currentState == BACKUP_BLOCK_RELEASING) && + workerPid > 0 && + kill(workerPid, 0) == -1 && errno == ESRCH) + { + /* worker is gone — re-acquire exclusive and double-check */ + LWLockAcquire(&BackupBlockControl->lock, LW_EXCLUSIVE); + + if ((BackupBlockControl->state == BACKUP_BLOCK_ACTIVE || + BackupBlockControl->state == BACKUP_BLOCK_STARTING || + BackupBlockControl->state == BACKUP_BLOCK_RELEASING) && + BackupBlockControl->workerPid == workerPid) + { + elog(WARNING, "backup block: detected stale state (worker PID %d " + "no longer exists), auto-cleaning", workerPid); + + BackupBlockControl->state = BACKUP_BLOCK_INACTIVE; + BackupBlockControl->workerPid = 0; + BackupBlockControl->requestorPid = 0; + BackupBlockControl->releaseRequested = false; + BackupBlockControl->nodeCount = 0; + BackupBlockControl->errorMessage[0] = '\0'; + BackupBlockControl->blockStartTime = 0; + BackupBlockControl->timeoutMs = 0; + } + + currentState = BackupBlockControl->state; + LWLockRelease(&BackupBlockControl->lock); + } + + LWLockAcquire(&BackupBlockControl->lock, LW_SHARED); + + /* state */ + const char *stateStr; + switch (BackupBlockControl->state) + { + case BACKUP_BLOCK_INACTIVE: + stateStr = "inactive"; + break; + + case BACKUP_BLOCK_STARTING: + stateStr = "starting"; + break; + + case BACKUP_BLOCK_ACTIVE: + stateStr = "active"; + break; + + case BACKUP_BLOCK_RELEASING: + stateStr = "releasing"; + break; + + case BACKUP_BLOCK_ERROR: + stateStr = "error"; + break; + + default: + stateStr = "unknown"; + break; + } + values[0] = CStringGetTextDatum(stateStr); + + /* worker_pid */ + if (BackupBlockControl->workerPid > 0) + { + values[1] = Int32GetDatum(BackupBlockControl->workerPid); + } + else + { + nulls[1] = true; + } + + /* requestor_pid */ + if (BackupBlockControl->requestorPid > 0) + { + values[2] = Int32GetDatum(BackupBlockControl->requestorPid); + } + else + { + nulls[2] = true; + } + + /* block_start_time */ + if (BackupBlockControl->blockStartTime > 0) + { + values[3] = TimestampTzGetDatum(BackupBlockControl->blockStartTime); + } + else + { + nulls[3] = true; + } + + /* timeout_ms */ + values[4] = Int32GetDatum(BackupBlockControl->timeoutMs); + + /* node_count */ + values[5] = Int32GetDatum(BackupBlockControl->nodeCount); + + LWLockRelease(&BackupBlockControl->lock); + + HeapTuple tuple = heap_form_tuple(tupleDesc, values, nulls); + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); +} diff --git a/src/backend/distributed/operations/citus_create_restore_point.c b/src/backend/distributed/operations/citus_create_restore_point.c index f57e3cc996d..2f9b42f09d8 100644 --- a/src/backend/distributed/operations/citus_create_restore_point.c +++ b/src/backend/distributed/operations/citus_create_restore_point.c @@ -22,6 +22,7 @@ #include "utils/builtins.h" #include "utils/pg_lsn.h" +#include "distributed/backup_block.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -31,17 +32,6 @@ #define CREATE_RESTORE_POINT_COMMAND "SELECT pg_catalog.pg_create_restore_point($1::text)" -/* - * BLOCK_DISTRIBUTED_WRITES_COMMAND acquires ExclusiveLock on: - * 1. pg_dist_transaction - blocks 2PC commit decisions - * 2. pg_dist_partition - blocks DDL operations on distributed tables - * - * This ensures both DML (via 2PC) and DDL are blocked on metadata nodes. - */ -#define BLOCK_DISTRIBUTED_WRITES_COMMAND \ - "LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE; " \ - "LOCK TABLE pg_catalog.pg_dist_partition IN EXCLUSIVE MODE" - /* local functions forward declarations */ static List * OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode); static void BlockDistributedTransactions(void); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3dbd81abb32..6ab1beba954 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -52,6 +52,7 @@ #include "distributed/adaptive_executor.h" #include "distributed/backend_data.h" +#include "distributed/backup_block.h" #include "distributed/background_jobs.h" #include "distributed/causal_clock.h" #include "distributed/citus_depended_object.h" @@ -510,6 +511,7 @@ _PG_init(void) InitializeMaintenanceDaemon(); InitializeMaintenanceDaemonForMainDb(); + InitializeBackupBlock(); /* initialize coordinated transaction management */ InitializeTransactionManagement(); @@ -649,6 +651,7 @@ citus_shmem_request(void) RequestAddinShmemSpace(MaintenanceDaemonShmemSize()); RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(LogicalClockShmemSize()); + RequestAddinShmemSpace(BackupBlockShmemSize()); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); RequestAddinShmemSpace(StatCountersShmemSize()); RequestNamedLWLockTranche(SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME, 1); diff --git a/src/backend/distributed/sql/citus--14.0-1--15.0-1.sql b/src/backend/distributed/sql/citus--14.0-1--15.0-1.sql index ea9d7c4dea9..85b714d962a 100644 --- a/src/backend/distributed/sql/citus--14.0-1--15.0-1.sql +++ b/src/backend/distributed/sql/citus--14.0-1--15.0-1.sql @@ -9,3 +9,8 @@ #include "udfs/citus_internal_lock_colocation_id/15.0-1.sql" #include "udfs/citus_internal_acquire_placement_colocation_lock/15.0-1.sql" + +-- backup block UDFs +#include "udfs/citus_block_writes_for_backup/15.0-1.sql" +#include "udfs/citus_unblock_writes_for_backup/15.0-1.sql" +#include "udfs/citus_backup_block_status/15.0-1.sql" diff --git a/src/backend/distributed/sql/citus--8.0-1.sql b/src/backend/distributed/sql/citus--8.0-1.sql index baac9dd4257..58186fd4194 100644 --- a/src/backend/distributed/sql/citus--8.0-1.sql +++ b/src/backend/distributed/sql/citus--8.0-1.sql @@ -1440,6 +1440,35 @@ AS 'MODULE_PATHNAME', $$citus_create_restore_point$$; COMMENT ON FUNCTION pg_catalog.citus_create_restore_point(text) IS 'temporarily block writes and create a named restore point on all nodes'; +-- distributed backup block +CREATE OR REPLACE FUNCTION pg_catalog.citus_block_writes_for_backup( + timeout_ms int DEFAULT 300000) +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_block_writes_for_backup$$; +COMMENT ON FUNCTION pg_catalog.citus_block_writes_for_backup(int) +IS 'block distributed 2PC writes across the Citus cluster for backup'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_unblock_writes_for_backup() +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_unblock_writes_for_backup$$; +COMMENT ON FUNCTION pg_catalog.citus_unblock_writes_for_backup() +IS 'release the distributed write block held for backup'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_backup_block_status( + OUT state text, + OUT worker_pid int, + OUT requestor_pid int, + OUT block_start_time timestamptz, + OUT timeout_ms int, + OUT node_count int) +RETURNS record +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_backup_block_status$$; +COMMENT ON FUNCTION pg_catalog.citus_backup_block_status() +IS 'return the current status of the distributed write block for backup'; + -- functions for giving node a unique identifier CREATE OR REPLACE FUNCTION pg_catalog.citus_version() RETURNS text diff --git a/src/backend/distributed/sql/udfs/citus_backup_block_status/15.0-1.sql b/src/backend/distributed/sql/udfs/citus_backup_block_status/15.0-1.sql new file mode 100644 index 00000000000..be57b675f5d --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_backup_block_status/15.0-1.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_backup_block_status( + OUT state text, + OUT worker_pid int, + OUT requestor_pid int, + OUT block_start_time timestamptz, + OUT timeout_ms int, + OUT node_count int) +RETURNS record +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_backup_block_status$$; +COMMENT ON FUNCTION pg_catalog.citus_backup_block_status() +IS 'return the current status of the distributed write block for backup'; diff --git a/src/backend/distributed/sql/udfs/citus_backup_block_status/latest.sql b/src/backend/distributed/sql/udfs/citus_backup_block_status/latest.sql new file mode 100644 index 00000000000..be57b675f5d --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_backup_block_status/latest.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_backup_block_status( + OUT state text, + OUT worker_pid int, + OUT requestor_pid int, + OUT block_start_time timestamptz, + OUT timeout_ms int, + OUT node_count int) +RETURNS record +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_backup_block_status$$; +COMMENT ON FUNCTION pg_catalog.citus_backup_block_status() +IS 'return the current status of the distributed write block for backup'; diff --git a/src/backend/distributed/sql/udfs/citus_block_writes_for_backup/15.0-1.sql b/src/backend/distributed/sql/udfs/citus_block_writes_for_backup/15.0-1.sql new file mode 100644 index 00000000000..530004e3732 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_block_writes_for_backup/15.0-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_block_writes_for_backup( + timeout_ms int DEFAULT 300000) +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_block_writes_for_backup$$; +COMMENT ON FUNCTION pg_catalog.citus_block_writes_for_backup(int) +IS 'block distributed 2PC writes across the Citus cluster for backup'; diff --git a/src/backend/distributed/sql/udfs/citus_block_writes_for_backup/latest.sql b/src/backend/distributed/sql/udfs/citus_block_writes_for_backup/latest.sql new file mode 100644 index 00000000000..530004e3732 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_block_writes_for_backup/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_block_writes_for_backup( + timeout_ms int DEFAULT 300000) +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_block_writes_for_backup$$; +COMMENT ON FUNCTION pg_catalog.citus_block_writes_for_backup(int) +IS 'block distributed 2PC writes across the Citus cluster for backup'; diff --git a/src/backend/distributed/sql/udfs/citus_unblock_writes_for_backup/15.0-1.sql b/src/backend/distributed/sql/udfs/citus_unblock_writes_for_backup/15.0-1.sql new file mode 100644 index 00000000000..7d43dcd4ed9 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_unblock_writes_for_backup/15.0-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_unblock_writes_for_backup() +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_unblock_writes_for_backup$$; +COMMENT ON FUNCTION pg_catalog.citus_unblock_writes_for_backup() +IS 'release the distributed write block held for backup'; diff --git a/src/backend/distributed/sql/udfs/citus_unblock_writes_for_backup/latest.sql b/src/backend/distributed/sql/udfs/citus_unblock_writes_for_backup/latest.sql new file mode 100644 index 00000000000..7d43dcd4ed9 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_unblock_writes_for_backup/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_unblock_writes_for_backup() +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_unblock_writes_for_backup$$; +COMMENT ON FUNCTION pg_catalog.citus_unblock_writes_for_backup() +IS 'release the distributed write block held for backup'; diff --git a/src/include/distributed/backup_block.h b/src/include/distributed/backup_block.h new file mode 100644 index 00000000000..962eb24a17d --- /dev/null +++ b/src/include/distributed/backup_block.h @@ -0,0 +1,109 @@ +/*------------------------------------------------------------------------- + * + * backup_block.h + * + * Declarations for blocking distributed writes during LTR backup. + * + * The backup block feature allows external backup tools to temporarily + * block distributed 2PC writes across the Citus cluster, take a + * consistent snapshot on every node, and then release the block. + * + * Architecture: + * - A dedicated background worker holds the ExclusiveLocks on + * pg_dist_transaction / pg_dist_partition / pg_dist_node on + * coordinator + all worker nodes. + * - Shared memory (BackupBlockControlData) communicates state + * between the UDF caller, the background worker, and the + * status/unblock UDFs. + * - The background worker auto-releases if timeout expires, + * the requestor backend exits, or a worker connection fails. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef BACKUP_BLOCK_H +#define BACKUP_BLOCK_H + +#include "postgres.h" + +#include "storage/lwlock.h" + + +/* + * BackupBlockState enumerates the lifecycle states of the backup block. + */ +typedef enum BackupBlockState +{ + BACKUP_BLOCK_INACTIVE = 0, /* no block active */ + BACKUP_BLOCK_STARTING, /* background worker is starting up */ + BACKUP_BLOCK_ACTIVE, /* locks acquired on all nodes */ + BACKUP_BLOCK_RELEASING, /* unblock requested, releasing */ + BACKUP_BLOCK_ERROR /* worker hit an error */ +} BackupBlockState; + + +/* + * BackupBlockControlData is the shared memory structure that coordinates + * the backup block lifecycle between UDF callers and the background worker. + * + * Protected by the embedded LWLock (lock). + */ +typedef struct BackupBlockControlData +{ + /* LWLock tranche info */ + int trancheId; + char lockTrancheName[NAMEDATALEN]; + LWLock lock; + + /* current state */ + BackupBlockState state; + + /* PIDs for lifecycle management */ + pid_t workerPid; /* PID of the background worker holding locks */ + pid_t requestorPid; /* PID of the backend that requested the block */ + + /* timing */ + TimestampTz blockStartTime; /* when locks were acquired */ + int timeoutMs; /* auto-release timeout in milliseconds */ + + /* cluster info */ + int nodeCount; /* number of worker nodes locked */ + + /* error reporting */ + char errorMessage[256]; /* error message if state == BACKUP_BLOCK_ERROR */ + + /* signal: set to true by unblock UDF to tell worker to release */ + bool releaseRequested; +} BackupBlockControlData; + + +/* + * BLOCK_DISTRIBUTED_WRITES_COMMAND acquires ExclusiveLock on: + * 1. pg_dist_transaction — blocks 2PC commit decisions + * 2. pg_dist_partition — blocks DDL on distributed tables + * + * Used by both citus_block_writes_for_backup and citus_create_restore_point + * to quiesce distributed writes on remote metadata nodes. + * + * Note: pg_dist_node is only locked locally on the coordinator (node + * management operations are coordinator-only), so it is intentionally + * absent from this remote command. + */ +#define BLOCK_DISTRIBUTED_WRITES_COMMAND \ + "LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE; " \ + "LOCK TABLE pg_catalog.pg_dist_partition IN EXCLUSIVE MODE" + + +/* Shared memory sizing and initialization */ +extern size_t BackupBlockShmemSize(void); +extern void BackupBlockShmemInit(void); + +/* Call from _PG_init to chain shmem_startup_hook */ +extern void InitializeBackupBlock(void); + +/* Background worker entry point */ +extern PGDLLEXPORT void CitusBackupBlockWorkerMain(Datum main_arg); + +#endif /* BACKUP_BLOCK_H */