Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
990 changes: 990 additions & 0 deletions src/backend/distributed/operations/backup_block.c

Large diffs are not rendered by default.

12 changes: 1 addition & 11 deletions src/backend/distributed/operations/citus_create_restore_point.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "utils/builtins.h"
#include "utils/pg_lsn.h"

#include "distributed/backup_block.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
Expand All @@ -31,17 +32,6 @@

#define CREATE_RESTORE_POINT_COMMAND "SELECT pg_catalog.pg_create_restore_point($1::text)"

/*
* BLOCK_DISTRIBUTED_WRITES_COMMAND acquires ExclusiveLock on:
* 1. pg_dist_transaction - blocks 2PC commit decisions
* 2. pg_dist_partition - blocks DDL operations on distributed tables
*
* This ensures both DML (via 2PC) and DDL are blocked on metadata nodes.
*/
#define BLOCK_DISTRIBUTED_WRITES_COMMAND \
"LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE; " \
"LOCK TABLE pg_catalog.pg_dist_partition IN EXCLUSIVE MODE"

/* local functions forward declarations */
static List * OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode);
static void BlockDistributedTransactions(void);
Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

#include "distributed/adaptive_executor.h"
#include "distributed/backend_data.h"
#include "distributed/backup_block.h"
#include "distributed/background_jobs.h"
#include "distributed/causal_clock.h"
#include "distributed/citus_depended_object.h"
Expand Down Expand Up @@ -510,6 +511,7 @@ _PG_init(void)

InitializeMaintenanceDaemon();
InitializeMaintenanceDaemonForMainDb();
InitializeBackupBlock();

/* initialize coordinated transaction management */
InitializeTransactionManagement();
Expand Down Expand Up @@ -649,6 +651,7 @@ citus_shmem_request(void)
RequestAddinShmemSpace(MaintenanceDaemonShmemSize());
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestAddinShmemSpace(LogicalClockShmemSize());
RequestAddinShmemSpace(BackupBlockShmemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
RequestAddinShmemSpace(StatCountersShmemSize());
RequestNamedLWLockTranche(SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME, 1);
Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/sql/citus--14.0-1--15.0-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@
#include "udfs/citus_internal_lock_colocation_id/15.0-1.sql"

#include "udfs/citus_internal_acquire_placement_colocation_lock/15.0-1.sql"

-- backup block UDFs
#include "udfs/citus_block_writes_for_backup/15.0-1.sql"
#include "udfs/citus_unblock_writes_for_backup/15.0-1.sql"
#include "udfs/citus_backup_block_status/15.0-1.sql"
29 changes: 29 additions & 0 deletions src/backend/distributed/sql/citus--8.0-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,35 @@ AS 'MODULE_PATHNAME', $$citus_create_restore_point$$;
COMMENT ON FUNCTION pg_catalog.citus_create_restore_point(text)
IS 'temporarily block writes and create a named restore point on all nodes';

-- distributed backup block
CREATE OR REPLACE FUNCTION pg_catalog.citus_block_writes_for_backup(
timeout_ms int DEFAULT 300000)
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_block_writes_for_backup$$;
COMMENT ON FUNCTION pg_catalog.citus_block_writes_for_backup(int)
IS 'block distributed 2PC writes across the Citus cluster for backup';

CREATE OR REPLACE FUNCTION pg_catalog.citus_unblock_writes_for_backup()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_unblock_writes_for_backup$$;
COMMENT ON FUNCTION pg_catalog.citus_unblock_writes_for_backup()
IS 'release the distributed write block held for backup';

CREATE OR REPLACE FUNCTION pg_catalog.citus_backup_block_status(
OUT state text,
OUT worker_pid int,
OUT requestor_pid int,
OUT block_start_time timestamptz,
OUT timeout_ms int,
OUT node_count int)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_backup_block_status$$;
COMMENT ON FUNCTION pg_catalog.citus_backup_block_status()
IS 'return the current status of the distributed write block for backup';

-- functions for giving node a unique identifier
CREATE OR REPLACE FUNCTION pg_catalog.citus_version()
RETURNS text
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_backup_block_status(
OUT state text,
OUT worker_pid int,
OUT requestor_pid int,
OUT block_start_time timestamptz,
OUT timeout_ms int,
OUT node_count int)
RETURNS record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_backup_block_status$$;
COMMENT ON FUNCTION pg_catalog.citus_backup_block_status()
IS 'return the current status of the distributed write block for backup';

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_block_writes_for_backup(
timeout_ms int DEFAULT 300000)
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_block_writes_for_backup$$;
COMMENT ON FUNCTION pg_catalog.citus_block_writes_for_backup(int)
IS 'block distributed 2PC writes across the Citus cluster for backup';

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_unblock_writes_for_backup()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_unblock_writes_for_backup$$;
COMMENT ON FUNCTION pg_catalog.citus_unblock_writes_for_backup()
IS 'release the distributed write block held for backup';
109 changes: 109 additions & 0 deletions src/include/distributed/backup_block.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*-------------------------------------------------------------------------
*
* backup_block.h
*
* Declarations for blocking distributed writes during LTR backup.
*
* The backup block feature allows external backup tools to temporarily
* block distributed 2PC writes across the Citus cluster, take a
* consistent snapshot on every node, and then release the block.
*
* Architecture:
* - A dedicated background worker holds the ExclusiveLocks on
* pg_dist_transaction / pg_dist_partition / pg_dist_node on
* coordinator + all worker nodes.
* - Shared memory (BackupBlockControlData) communicates state
* between the UDF caller, the background worker, and the
* status/unblock UDFs.
* - The background worker auto-releases if timeout expires,
* the requestor backend exits, or a worker connection fails.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#ifndef BACKUP_BLOCK_H
#define BACKUP_BLOCK_H

#include "postgres.h"

#include "storage/lwlock.h"


/*
* BackupBlockState enumerates the lifecycle states of the backup block.
*/
typedef enum BackupBlockState
{
BACKUP_BLOCK_INACTIVE = 0, /* no block active */
BACKUP_BLOCK_STARTING, /* background worker is starting up */
BACKUP_BLOCK_ACTIVE, /* locks acquired on all nodes */
BACKUP_BLOCK_RELEASING, /* unblock requested, releasing */
BACKUP_BLOCK_ERROR /* worker hit an error */
} BackupBlockState;


/*
* BackupBlockControlData is the shared memory structure that coordinates
* the backup block lifecycle between UDF callers and the background worker.
*
* Protected by the embedded LWLock (lock).
*/
typedef struct BackupBlockControlData
{
/* LWLock tranche info */
int trancheId;
char lockTrancheName[NAMEDATALEN];
LWLock lock;

/* current state */
BackupBlockState state;

/* PIDs for lifecycle management */
pid_t workerPid; /* PID of the background worker holding locks */
pid_t requestorPid; /* PID of the backend that requested the block */

/* timing */
TimestampTz blockStartTime; /* when locks were acquired */
int timeoutMs; /* auto-release timeout in milliseconds */

/* cluster info */
int nodeCount; /* number of worker nodes locked */

/* error reporting */
char errorMessage[256]; /* error message if state == BACKUP_BLOCK_ERROR */

/* signal: set to true by unblock UDF to tell worker to release */
bool releaseRequested;
} BackupBlockControlData;


/*
* BLOCK_DISTRIBUTED_WRITES_COMMAND acquires ExclusiveLock on:
* 1. pg_dist_transaction — blocks 2PC commit decisions
* 2. pg_dist_partition — blocks DDL on distributed tables
*
* Used by both citus_block_writes_for_backup and citus_create_restore_point
* to quiesce distributed writes on remote metadata nodes.
*
* Note: pg_dist_node is only locked locally on the coordinator (node
* management operations are coordinator-only), so it is intentionally
* absent from this remote command.
*/
#define BLOCK_DISTRIBUTED_WRITES_COMMAND \
"LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE; " \
"LOCK TABLE pg_catalog.pg_dist_partition IN EXCLUSIVE MODE"


/* Shared memory sizing and initialization */
extern size_t BackupBlockShmemSize(void);
extern void BackupBlockShmemInit(void);

/* Call from _PG_init to chain shmem_startup_hook */
extern void InitializeBackupBlock(void);

/* Background worker entry point */
extern PGDLLEXPORT void CitusBackupBlockWorkerMain(Datum main_arg);

#endif /* BACKUP_BLOCK_H */
Loading