diff --git a/Makefile b/Makefile index fe907e8d..7e3d8afc 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondar toasted replication_set matview bidirectional primary_key \ interfaces foreign_key copy sequence triggers parallel functions row_filter \ row_filter_sampling att_list column_filter apply_delay \ - extended node_origin_cascade multiple_upstreams tuple_origin autoddl \ + extended progress_tracking node_origin_cascade multiple_upstreams tuple_origin autoddl \ drop # The following test cases are disabled while developing. diff --git a/docs/spock_release_notes.md b/docs/spock_release_notes.md index 274a3b5a..7b80ccf0 100644 --- a/docs/spock_release_notes.md +++ b/docs/spock_release_notes.md @@ -1,5 +1,12 @@ # Spock Release Notes +## Spock 5.0.5 on Feb 12, 2026 + +* Fix segfault that occurs when using new Postgres minor releases like 18.2. +* Zero Downtime Add Node (Zodan) minor bug fixes and improvements +* Updated documentation + + ## Spock 5.0.4 on Oct 8, 2025 * Reduce memory usage for transactions with many inserts. @@ -17,7 +24,6 @@ - Fix bug where spock incorrectly outputs a message that DDL was replicated when a transaction is executing in repair mode. - ## v5.0.3 on Sep 26, 2025 * Spock 5.0.3 adds support for Postgres 18. diff --git a/include/spock.h b/include/spock.h index de18bc97..4f26faa9 100644 --- a/include/spock.h +++ b/include/spock.h @@ -24,8 +24,8 @@ #include "spock_fe.h" #include "spock_node.h" -#define SPOCK_VERSION "5.0.4" -#define SPOCK_VERSION_NUM 50004 +#define SPOCK_VERSION "5.0.5" +#define SPOCK_VERSION_NUM 50005 #define EXTENSION_NAME "spock" diff --git a/patches/15/pg15-010-allow_logical_decoding_on_standbys.diff b/patches/15/pg15-010-allow_logical_decoding_on_standbys.diff index 197c22b6..8ba8c453 100644 --- a/patches/15/pg15-010-allow_logical_decoding_on_standbys.diff +++ b/patches/15/pg15-010-allow_logical_decoding_on_standbys.diff @@ -247,77 +247,78 @@ index 466f30c22d..b67ad3abe4 100644 slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c -index 80d96db8eb..2d5b2945ed 100644 +index 1f4aad52c4a..623967e4fa1 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -40,6 +40,7 @@ #include - + #include "access/transam.h" +#include "access/xlogrecovery.h" #include "access/xlog_internal.h" #include "common/string.h" #include "miscadmin.h" -@@ -1174,37 +1175,28 @@ ReplicationSlotReserveWal(void) - /* - * For logical slots log a standby snapshot and start logical decoding - * at exactly that position. That allows the slot to start up more -- * quickly. -+ * quickly. But on a standby we cannot do WAL writes, so just use the -+ * replay pointer; effectively, an attempt to create a logical slot on -+ * standby will cause it to wait for an xl_running_xact record to be -+ * logged independently on the primary, so that a snapshot can be -+ * built using the record. - * -- * That's not needed (or indeed helpful) for physical slots as they'll -- * start replay at the last logged checkpoint anyway. Instead return -- * the location of the last redo LSN. While that slightly increases -- * the chance that we have to retry, it's where a base backup has to -- * start replay at. -+ * None of this is needed (or indeed helpful) for physical slots as -+ * they'll start replay at the last logged checkpoint anyway. Instead -+ * return the location of the last redo LSN. While that slightly -+ * increases the chance that we have to retry, it's where a base -+ * backup has to start replay at. - */ -- if (!RecoveryInProgress() && SlotIsLogical(slot)) -- { -- XLogRecPtr flushptr; +@@ -1238,37 +1239,29 @@ ReplicationSlotReserveWal(void) + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + + /* +- * For logical slots log a standby snapshot and start logical decoding at +- * exactly that position. That allows the slot to start up more quickly. ++ * For logical slots log a standby snapshot and start logical decoding ++ * at exactly that position. That allows the slot to start up more ++ * quickly. But on a standby we cannot do WAL writes, so just use the ++ * replay pointer; effectively, an attempt to create a logical slot on ++ * standby will cause it to wait for an xl_running_xact record to be ++ * logged independently on the primary, so that a snapshot can be ++ * built using the record. + * +- * That's not needed (or indeed helpful) for physical slots as they'll +- * start replay at the last logged checkpoint anyway. Instead return the +- * location of the last redo LSN, where a base backup has to start replay +- * at. ++ * None of this is needed (or indeed helpful) for physical slots as ++ * they'll start replay at the last logged checkpoint anyway. Instead ++ * return the location of the last redo LSN, where a base backup has ++ * to start replay at. + */ +- if (!RecoveryInProgress() && SlotIsLogical(slot)) +- { +- XLogRecPtr flushptr; - -- /* start at current insert position */ -+ if (SlotIsPhysical(slot)) -+ restart_lsn = GetRedoRecPtr(); -+ else if (RecoveryInProgress()) -+ restart_lsn = GetXLogReplayRecPtr(NULL); -+ else - restart_lsn = GetXLogInsertRecPtr(); -- SpinLockAcquire(&slot->mutex); -- slot->data.restart_lsn = restart_lsn; -- SpinLockRelease(&slot->mutex); +- /* start at current insert position */ ++ if (SlotIsPhysical(slot)) ++ restart_lsn = GetRedoRecPtr(); ++ else if (RecoveryInProgress()) ++ restart_lsn = GetXLogReplayRecPtr(NULL); ++ else + restart_lsn = GetXLogInsertRecPtr(); +- SpinLockAcquire(&slot->mutex); +- slot->data.restart_lsn = restart_lsn; +- SpinLockRelease(&slot->mutex); - -- /* make sure we have enough information to start */ -- flushptr = LogStandbySnapshot(); - -- /* and make sure it's fsynced to disk */ -- XLogFlush(flushptr); -- } -- else -- { -- restart_lsn = GetRedoRecPtr(); -- SpinLockAcquire(&slot->mutex); -- slot->data.restart_lsn = restart_lsn; -- SpinLockRelease(&slot->mutex); -- } -+ SpinLockAcquire(&slot->mutex); -+ slot->data.restart_lsn = restart_lsn; -+ SpinLockRelease(&slot->mutex); - - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); -@@ -1220,6 +1212,17 @@ ReplicationSlotReserveWal(void) - if (XLogGetLastRemovedSegno() < segno) - break; - } +- /* make sure we have enough information to start */ +- flushptr = LogStandbySnapshot(); + +- /* and make sure it's fsynced to disk */ +- XLogFlush(flushptr); +- } +- else +- { +- restart_lsn = GetRedoRecPtr(); +- SpinLockAcquire(&slot->mutex); +- slot->data.restart_lsn = restart_lsn; +- SpinLockRelease(&slot->mutex); +- } ++ SpinLockAcquire(&slot->mutex); ++ slot->data.restart_lsn = restart_lsn; ++ SpinLockRelease(&slot->mutex); + + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); +@@ -1280,6 +1273,17 @@ ReplicationSlotReserveWal(void) + NameStr(slot->data.name)); + + LWLockRelease(ReplicationSlotAllocationLock); + + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { @@ -330,7 +331,7 @@ index 80d96db8eb..2d5b2945ed 100644 + XLogFlush(flushptr); + } } - + /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 269914bce2..3c39fe20cb 100644 diff --git a/samples/Z0DAN/n1.pgb b/samples/Z0DAN/n1.pgb new file mode 100644 index 00000000..2096a598 --- /dev/null +++ b/samples/Z0DAN/n1.pgb @@ -0,0 +1,4 @@ +\set aid random(1, 50000) + +UPDATE pgbench_accounts SET abalance = abalance + :aid WHERE aid = :aid; +UPDATE pgbench_accounts SET abalance = abalance - :aid WHERE aid = :aid; \ No newline at end of file diff --git a/samples/Z0DAN/n2.pgb b/samples/Z0DAN/n2.pgb new file mode 100644 index 00000000..e7180ef5 --- /dev/null +++ b/samples/Z0DAN/n2.pgb @@ -0,0 +1,4 @@ +\set aid random(50001, 100000) + +UPDATE pgbench_accounts SET abalance = abalance + :aid WHERE aid = :aid; +UPDATE pgbench_accounts SET abalance = abalance - :aid WHERE aid = :aid; \ No newline at end of file diff --git a/samples/Z0DAN/wait_subscription.sql b/samples/Z0DAN/wait_subscription.sql new file mode 100644 index 00000000..58e52162 --- /dev/null +++ b/samples/Z0DAN/wait_subscription.sql @@ -0,0 +1,98 @@ +CREATE OR REPLACE FUNCTION wait_subscription( + remote_node_name Name, + report_it Boolean DEFAULT false, + timeout Interval DEFAULT '0 second', + delay Real DEFAULT 1. +) RETURNS bigint AS $$ +DECLARE + state Record; + lag Bigint := 1; + end_time Timestamp := 'infinity'; + time_remained Interval; + local_node_name Name; + wal_sender_timeout Bigint; + prev_received_lsn pg_lsn := '0/0'::pg_lsn; +BEGIN + -- spock.local_node.node_id->spock.node(node_id -> node_name) + SELECT node_name FROM spock.node + WHERE node_id = (SELECT node_id FROM spock.local_node) + INTO local_node_name; + + SELECT EXTRACT(epoch + FROM ( + SELECT (current_setting('wal_sender_timeout')::Interval)) + )::real * 1024 + INTO wal_sender_timeout; + + -- Calculate the End Time, if requested. + IF timeout > '0 second' THEN + SELECT now() + timeout INTO end_time; + END IF; + -- SELECT EXTRACT(epoch FROM my_interval)/3600 + WHILE lag > 0 LOOP + + SELECT end_time - clock_timestamp() INTO time_remained; + IF time_remained < '0 second' THEN + RETURN state.lag; + END IF; + + -- NOTE: Remember, an apply group may contain more than a single worker. + SELECT + MAX(remote_insert_lsn) AS remote_write_lsn, + MAX(received_lsn) AS received_lsn + FROM spock.lag_tracker + WHERE origin_name = remote_node_name AND receiver_name = local_node_name + INTO state; + + -- Special case: nothing arrived yet + IF (state.received_lsn = '0/0'::pg_lsn) THEN + IF report_it = true THEN + raise NOTICE 'Replication % -> %: waiting WAL ... . Time remained: % (HH24:MI:SS)', + remote_node_name, local_node_name, + to_char(time_remained, 'HH24:MI:SS'); + END IF; + PERFORM pg_sleep(delay); + CONTINUE; + END IF; + + -- Special case: No transactions has been executed on the remote yet. + IF (state.remote_write_lsn = '0/0'::pg_lsn) THEN + IF report_it = true THEN + raise NOTICE 'Replication % -> %: waiting anything substantial ... Received LSN: %. Time remained: % (HH24:MI:SS)', + remote_node_name, local_node_name, state.received_lsn, + to_char(time_remained, 'HH24:MI:SS'); + PERFORM pg_sleep(delay); + CONTINUE; + END IF; + + -- Check any progress + IF (state.received_lsn = prev_received_lsn) THEN + raise EXCEPTION 'Replication % -> %: publisher seems get stuck into something', + remote_node_name, local_node_name; + END IF; + + -- We have a progress, wait further. + prev_received_lsn = state.received_lsn; + -- To be sure we get a 'keepalive' message + PERFORM pg_sleep(wal_sender_timeout * 2); + + PERFORM pg_sleep(delay); + CONTINUE; + END IF; + + SELECT MAX(remote_insert_lsn - received_lsn) FROM spock.lag_tracker + WHERE origin_name = remote_node_name AND receiver_name = local_node_name + INTO lag; + + IF report_it = true THEN + raise NOTICE 'Replication % -> %: current lag % MB, Time remained: % (HH24:MI:SS)', + remote_node_name, local_node_name, lag/1024/1024, + to_char(time_remained, 'HH24:MI:SS'); + END IF; + + PERFORM pg_sleep(delay); + END LOOP; + + RETURN lag; +END +$$ LANGUAGE plpgsql VOLATILE; diff --git a/samples/Z0DAN/zodan.py b/samples/Z0DAN/zodan.py index 2989bca9..f6ab72e2 100755 --- a/samples/Z0DAN/zodan.py +++ b/samples/Z0DAN/zodan.py @@ -160,7 +160,21 @@ def verify_node_prerequisites(self, src_node_name: str, src_dsn: str, new_node_n except Exception as e: self.format_notice("✗", f"Database {new_db_name} does not exist on new node") raise Exception(f"Exiting add_node: Database {new_db_name} does not exist on new node. Please create it first.") - + + # Check if they previously installed lolor on the destination. + # They should not have run CREATE EXTENSION lolor yet + sql = """ + SELECT count(*) FROM pg_tables + WHERE schemaname = 'lolor' + """ + user_table_count = self.run_psql(new_node_dsn, sql, fetch=True, return_single=True) + + if user_table_count and int(user_table_count.strip()) > 0: + self.format_notice("✗", f"Checking database {new_db_name} to ensure lolor is not installed") + raise Exception(f"Exiting add_node: Destination database {new_db_name} has the lolor extension installed or remaining lolor user data. The new node should not have the lolor extension installed") + else: + self.format_notice("OK:", f"Checking database {new_db_name} to ensure lolor is not installed") + # Check if database has user-created tables sql = """ SELECT count(*) FROM pg_tables @@ -385,8 +399,8 @@ def create_sub(self, node_dsn: str, subscription_name: str, provider_dsn: str, if self.verbose: self.info(f"Subscription {subscription_name} created remotely") - def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = "spock_output"): - """Create a logical replication slot on a remote node""" + def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = "spock_output") -> Optional[str]: + """Create a logical replication slot on a remote node and return the LSN""" # Check if slot already exists sql = f"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '{slot_name}';" count = self.run_psql(node_dsn, sql, fetch=True, return_single=True) @@ -394,20 +408,24 @@ def create_replication_slot(self, node_dsn: str, slot_name: str, plugin: str = " if count and int(count.strip()) > 0: if self.verbose: self.info(f"Replication slot '{slot_name}' already exists. Skipping creation.") - return - - sql = f"SELECT slot_name, lsn FROM pg_create_logical_replication_slot('{slot_name}', '{plugin}');" - + return None + + sql = f"SELECT lsn FROM pg_create_logical_replication_slot('{slot_name}', '{plugin}');" + if self.verbose: self.info(f"[QUERY] {sql}") - - result = self.run_psql(node_dsn, sql) + + # Fetch the result to get the LSN + result = self.run_psql(node_dsn, sql, fetch=True, return_single=True) if result is None: if self.verbose: self.info(f"Replication slot '{slot_name}' may already exist or creation failed.") + return None else: + lsn = result.strip() if self.verbose: - self.info(f"Created replication slot '{slot_name}' with plugin '{plugin}' on remote node.") + self.info(f"Created replication slot '{slot_name}' with plugin '{plugin}' on remote node (LSN: {lsn}).") + return lsn def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: str, new_node_name: str, new_node_dsn: str): @@ -424,7 +442,7 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st if rec['node_name'] == src_node_name: continue - # Create replication slot + # Create replication slot and capture the commit LSN dbname = "pgedge" # Default database name if "dbname=" in rec['dsn']: dbname = rec['dsn'].split("dbname=")[1].split()[0] @@ -433,12 +451,17 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st if len(slot_name) > 64: slot_name = slot_name[:64] - self.create_replication_slot(rec['dsn'], slot_name) - self.notice(f" OK: Creating replication slot {slot_name} on node {rec['node_name']}") + commit_lsn = self.create_replication_slot(rec['dsn'], slot_name) + self.notice(f" OK: Creating replication slot {slot_name} (LSN: {commit_lsn}) on node {rec['node_name']}") # Trigger sync event on origin node and store LSN for later use sync_lsn = self.sync_event(rec['dsn']) - self.sync_lsns[rec['node_name']] = sync_lsn + + # Store both sync_lsn and commit_lsn + self.sync_lsns[rec['node_name']] = { + 'sync_lsn': sync_lsn, + 'commit_lsn': commit_lsn + } self.notice(f" OK: Triggering sync event on node {rec['node_name']} (LSN: {sync_lsn})") # Create disabled subscription @@ -528,7 +551,7 @@ def trigger_source_sync_and_wait_on_new_node(self, src_node_name: str, src_dsn: if self.verbose: self.info(f" Remote SQL for sync_event on source node {src_node_name}: {sql}") - sync_lsn = self.run_psql(new_node_dsn, sql, fetch=True, return_single=True) + sync_lsn = self.run_psql(src_dsn, sql, fetch=True, return_single=True) if sync_lsn: self.format_notice("✓", f"Triggered sync_event on new node {src_node_name} (LSN: {sync_lsn})") else: @@ -562,42 +585,38 @@ def get_commit_timestamp(self, node_dsn: str, origin: str, receiver: str) -> str result = self.run_psql(node_dsn, sql, fetch=True, return_single=True) return result - def advance_replication_slot(self, node_dsn: str, slot_name: str, sync_timestamp: str): - """Advance a replication slot to a specific timestamp""" - if not sync_timestamp: + def advance_replication_slot(self, node_dsn: str, slot_name: str, target_lsn: str): + """Advance a replication slot to a specific LSN""" + if not target_lsn: if self.verbose: - self.info(f"Commit timestamp is NULL, skipping slot advance for slot '{slot_name}'.") + self.info(f"Target LSN is NULL, skipping slot advance for slot '{slot_name}'.") return - - sql = f""" - WITH lsn_cte AS ( - SELECT spock.get_lsn_from_commit_ts('{slot_name}', '{sync_timestamp}') AS lsn - ) - SELECT pg_replication_slot_advance('{slot_name}', lsn) FROM lsn_cte; - """ - + + sql = f"SELECT pg_replication_slot_advance('{slot_name}', '{target_lsn}'::pg_lsn);" + if self.verbose: self.info(f"[QUERY] {sql}") self.run_psql(node_dsn, sql) - def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: str, + def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: str, new_node_name: str, new_node_dsn: str): - """Phase 7: Check commit timestamp and advance replication slot""" - self.notice("Phase 7: Checking commit timestamp and advancing replication slot") - + """Phase 7: Check commit LSN and advance replication slot""" + self.notice("Phase 7: Checking commit LSN and advancing replication slot") + # Get all nodes from source cluster nodes = self.get_spock_nodes(src_dsn) for rec in nodes: if rec['node_name'] == src_node_name: continue - - # Get commit timestamp - sync_timestamp = self.get_commit_timestamp(new_node_dsn, src_node_name, rec['node_name']) - if sync_timestamp: - self.notice(f" OK: Found commit timestamp for {src_node_name}->{rec['node_name']}: {sync_timestamp}") - + + # Get the stored commit LSN from when subscription was created + commit_lsn = self.sync_lsns[rec['node_name']]['commit_lsn'] + + if commit_lsn: + self.notice(f" OK: Found commit LSN for {rec['node_name']} (LSN: {commit_lsn})...") + # Advance replication slot dbname = "pgedge" if "dbname=" in rec['dsn']: @@ -611,18 +630,15 @@ def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: s self.info(f"[QUERY] {sql}") current_lsn = self.run_psql(rec['dsn'], sql, fetch=True, return_single=True) - - # Get target LSN - sql = f"SELECT spock.get_lsn_from_commit_ts('{slot_name}', '{sync_timestamp}')" - if self.verbose: - self.info(f"[QUERY] {sql}") - - target_lsn = self.run_psql(rec['dsn'], sql, fetch=True, return_single=True) - + + target_lsn = commit_lsn + if current_lsn and target_lsn and current_lsn >= target_lsn: self.notice(f" - Slot {slot_name} already at or beyond target LSN (current: {current_lsn}, target: {target_lsn})") else: - self.advance_replication_slot(rec['dsn'], slot_name, sync_timestamp) + self.advance_replication_slot(rec['dsn'], slot_name, target_lsn) + else: + self.notice(f" - No commit LSN found for {rec['node_name']}->{new_node_name}") def enable_sub(self, node_dsn: str, sub_name: str, immediate: bool = True): """Enable a subscription on a remote node""" @@ -655,6 +671,64 @@ def enable_sub(self, node_dsn: str, sub_name: str, immediate: bool = True): if self.verbose: self.info(f"Subscription {sub_name} enabled successfully") + def verify_subscription_replicating(self, node_dsn: str, subscription_name: str, + verb: bool = True, max_attempts: int = 120): + """ + Verifies that a subscription is actively replicating after being enabled + + Arguments: + node_dsn: DSN of the node where subscription exists + subscription_name: Name of the subscription to verify + verb: Verbose output flag (default: True) + max_attempts: Maximum verification attempts in seconds (default: 120 = 2 minutes) + + Usage: + manager.verify_subscription_replicating(node_dsn, 'sub_name', True) + + Notes: + Raises exception if subscription fails to reach 'replicating' status within timeout + + Example: + # Verify subscription is replicating with 2 minute timeout + self.verify_subscription_replicating( + new_node_dsn, + 'sub_n1_n3', + verb=True, + max_attempts=120 + ) + """ + verify_count = 0 + + while True: + verify_count += 1 + + # Check subscription status on the target node + sql = f"SELECT status FROM spock.sub_show_status() WHERE subscription_name = '{subscription_name}'" + + if self.verbose and verb: + self.info(f"[QUERY] {sql}") + + sub_status = self.run_psql(node_dsn, sql, fetch=True, return_single=True) + + if sub_status and sub_status.strip() == 'replicating': + if verb: + msg = f"Verified subscription {subscription_name} is replicating" + self.notice(f" SUCCESS: {msg.ljust(120)}") + break + elif verify_count >= max_attempts: + status_display = sub_status.strip() if sub_status else 'unknown' + raise Exception( + f"Subscription {subscription_name} verification timeout after {max_attempts} seconds " + f"(final status: {status_display})" + ) + else: + if verb: + status_display = sub_status.strip() if sub_status else 'unknown' + msg = (f"Waiting for subscription {subscription_name} to start replicating " + f"(status: {status_display}, attempt {verify_count}/{max_attempts})") + self.notice(f" ⏳ {msg.ljust(120)}") + time.sleep(1) + def enable_disabled_subscriptions(self, src_node_name: str, src_dsn: str, new_node_name: str, new_node_dsn: str): """Phase 8: Enable disabled subscriptions and wait for stored sync events""" @@ -675,7 +749,8 @@ def enable_disabled_subscriptions(self, src_node_name: str, src_dsn: str, # Wait for the sync event that was captured when subscription was created # This ensures the subscription starts replicating from the correct sync point timeout_ms = 1200 # 20 minutes - sync_lsn = self.sync_lsns.get(rec['node_name']) # Use stored sync LSN from Phase 3 + sync_lsn = self.sync_lsns[rec['node_name']]['sync_lsn'] # Use stored sync LSN from Phase 3 + if sync_lsn: self.notice(f" OK: Using stored sync event from origin node {rec['node_name']} (LSN: {sync_lsn})...") @@ -688,6 +763,13 @@ def enable_disabled_subscriptions(self, src_node_name: str, src_dsn: str, self.notice(f" OK: Waiting for sync event from {rec['node_name']} on new node {new_node_name}...") else: self.notice(f" ⚠ No stored sync LSN found for {rec['node_name']}, skipping sync wait") + # Verify it's replicating + self.verify_subscription_replicating( + node_dsn=new_node_dsn, + subscription_name=sub_name, + verb=True, + max_attempts=120 # 2 minutes + ) except Exception as e: self.notice(f" ✗ Enabling subscription {sub_name}... (error: {e})") @@ -1056,6 +1138,25 @@ def health_check(self, src_node_name: str, src_dsn: str, new_node_name: str = No # Check 5: Database prerequisites (for pre-check only) if check_type == "pre" and new_node_dsn: + # Check if they previously installed lolor on the destination. + # They should not have run CREATE EXTENSION lolor yet + try: + sql = """ + SELECT count(*) FROM pg_tables + WHERE schemaname = 'lolor' + """ + user_table_count = self.run_psql(new_node_dsn, sql, fetch=True, return_single=True) + + if user_table_count and int(user_table_count.strip()) == 0: + self.format_notice("PASS:", f"Destination database does not have signs of lolor being installed") + checks_passed += 1 + else: + self.format_notice("FAIL:", f"Destination database has the lolor extension installed or remaining lolor user data in the lolor schema.") + checks_failed += 1 + except Exception as e: + self.format_notice("FAIL:", f"lolor extension check - {str(e)}") + checks_failed += 1 + # Check database is empty try: sql = """ diff --git a/samples/Z0DAN/zodan.sql b/samples/Z0DAN/zodan.sql index e6e0e91a..02241d45 100644 --- a/samples/Z0DAN/zodan.sql +++ b/samples/Z0DAN/zodan.sql @@ -55,7 +55,7 @@ BEGIN IF src_version IS NULL THEN RAISE EXCEPTION 'Spock extension not found on source node'; END IF; - + -- Check source node has required version (strip -devel suffix for comparison) IF regexp_replace(src_version, '-devel$', '') < min_required_version THEN RAISE EXCEPTION 'Spock version mismatch: source node has version %, but minimum required version is %. Please upgrade all nodes to at least %.', @@ -71,7 +71,7 @@ BEGIN IF new_version IS NULL THEN RAISE EXCEPTION 'Spock extension not found on new node'; END IF; - + -- Check new node has required version (strip -devel suffix for comparison) IF regexp_replace(new_version, '-devel$', '') < min_required_version THEN RAISE EXCEPTION 'Spock version mismatch: new node has version %, but minimum required version is %. Please upgrade all nodes to at least %.', @@ -95,7 +95,7 @@ BEGIN IF node_version IS NULL THEN RAISE EXCEPTION 'Spock extension not found on node %', node_rec.node_name; END IF; - + IF regexp_replace(node_version, '-devel$', '') < min_required_version THEN version_mismatch := true; RAISE EXCEPTION 'Spock version mismatch: node % has version %, but required version is at least %. All nodes must have version % or later.', @@ -570,131 +570,6 @@ BEGIN END; $$; - --- ============================================================================ - - --- ============================================================================ --- Procedure: get_commit_timestamp --- Purpose : Retrieves the commit timestamp for replication lag between two nodes. --- Arguments: --- node_dsn - DSN string to connect to the remote node. --- n1 - Origin node name. --- n2 - Receiver node name. --- verb - Verbose output flag --- commit_ts - OUT parameter to receive the commit timestamp --- Usage : CALL get_commit_timestamp(node_dsn, 'n1', 'n2', true, NULL); --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.get_commit_timestamp( - node_dsn text, - n1 text, - n2 text, - verb boolean, - INOUT commit_ts timestamp DEFAULT NULL -) -LANGUAGE plpgsql -AS -$$ -DECLARE - ts_rec RECORD; - remotesql text; -BEGIN - -- Build remote SQL to fetch commit timestamp from lag_tracker - remotesql := format( - 'SELECT commit_timestamp FROM spock.lag_tracker WHERE origin_name = %L AND receiver_name = %L', - n1, n2 - ); - - IF verb THEN - RAISE NOTICE '[QUERY] %', remotesql; - END IF; - - -- Execute remote SQL and capture the commit timestamp - SELECT * FROM dblink(node_dsn, remotesql) AS t(commit_timestamp timestamp) INTO ts_rec; - - IF verb THEN - RAISE NOTICE E'[STEP] Commit timestamp for lag between "%" and "%": %', n1, n2, ts_rec.commit_timestamp; - END IF; - - commit_ts := ts_rec.commit_timestamp; -END; -$$; - - --- ============================================================================ - - --- ============================================================================ --- Procedure: advance_replication_slot --- Purpose : Advances a logical replication slot to a specific commit timestamp on a remote node via dblink. --- Arguments: --- node_dsn - DSN string to connect to the remote node. --- slot_name - Name of the replication slot to advance. --- sync_timestamp- Commit timestamp to advance the slot to. --- verb - Verbose output flag --- Usage : CALL advance_replication_slot(node_dsn, slot_name, sync_timestamp, true); --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.advance_replication_slot( - node_dsn text, - slot_name text, - sync_timestamp timestamp, - verb boolean -) -LANGUAGE plpgsql -AS -$$ -DECLARE - remotesql text; - slot_advance_result RECORD; -BEGIN - -- ============================================================================ - -- Step 1: Check if sync_timestamp is NULL - -- ============================================================================ - IF sync_timestamp IS NULL THEN - IF verb THEN - RAISE NOTICE E' - [STEP 1] Commit timestamp is NULL, skipping slot advance for slot "%". - ', slot_name; - END IF; - RETURN; - END IF; - - -- ============================================================================ - -- Step 2: Build remote SQL for advancing replication slot - -- ============================================================================ - remotesql := format( - 'WITH lsn_cte AS ( - SELECT spock.get_lsn_from_commit_ts(%L, %L::timestamp) AS lsn - ) - SELECT pg_replication_slot_advance(%L, lsn) FROM lsn_cte;', - slot_name, sync_timestamp::text, slot_name - ); - - IF verb THEN - RAISE NOTICE '[QUERY] %', remotesql; - END IF; - IF verb THEN - RAISE NOTICE E' - [STEP 2] Remote node DSN: % - ', node_dsn; - END IF; - - -- ============================================================================ - -- Step 3: Execute slot advance on remote node using dblink - -- ============================================================================ - SELECT * FROM dblink(node_dsn, remotesql) AS t(result text) INTO slot_advance_result; - - IF verb THEN - RAISE NOTICE E' - [STEP 3] Replication slot "%" advanced to commit timestamp % on remote node: %', - slot_name, sync_timestamp, node_dsn; - END IF; -END; -$$; - - -- ============================================================================ @@ -976,361 +851,6 @@ BEGIN END; $$; --- ============================================================================ --- Procedure: monitor_replication_lag --- Purpose : Monitors replication lag between nodes on a remote cluster via dblink. --- Arguments: --- node_dsn - DSN string to connect to the remote node. --- verb - Verbose output flag --- Usage : CALL monitor_replication_lag(node_dsn, true); --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.monitor_replication_lag(node_dsn text, verb boolean) -LANGUAGE plpgsql -AS -$$ -DECLARE - remotesql text; - node_query text; - node_list text := ''; - lag_vars text := ''; - lag_assignments text := ''; - lag_log text := ''; - lag_conditions text := ''; - node_rec record; - node_count integer := 0; -BEGIN - -- ============================================================================ - -- Step 1: Get all nodes from the remote cluster - -- ============================================================================ - IF verb THEN - RAISE NOTICE '[STEP] Getting nodes from remote cluster: %', node_dsn; - END IF; - - -- Get all nodes except the newest one (assuming it's the receiver) - FOR node_rec IN - SELECT * FROM dblink(node_dsn, 'SELECT node_name FROM spock.node ORDER BY node_id') - AS t(node_name text) - LOOP - node_count := node_count + 1; - IF node_count > 1 THEN - node_list := node_list || ', '; - END IF; - node_list := node_list || '''' || node_rec.node_name || ''''; - - -- Build lag variable declarations - IF node_count > 1 THEN - lag_vars := lag_vars || E'\n '; - END IF; - lag_vars := lag_vars || 'lag_' || node_rec.node_name || ' interval;'; - lag_vars := lag_vars || E'\n lag_' || node_rec.node_name || '_bytes bigint;'; - - -- Build lag assignments - IF node_count > 1 THEN - lag_assignments := lag_assignments || E'\n\n '; - END IF; - lag_assignments := lag_assignments || '-- Calculate lag from ' || node_rec.node_name || ' to newest node'; - lag_assignments := lag_assignments || E'\n SELECT now() - commit_timestamp, replication_lag_bytes INTO lag_' || node_rec.node_name || ', lag_' || node_rec.node_name || '_bytes'; - lag_assignments := lag_assignments || E'\n FROM spock.lag_tracker'; - lag_assignments := lag_assignments || E'\n WHERE origin_name = ''''' || node_rec.node_name || ''''' AND receiver_name = (SELECT node_name FROM spock.node ORDER BY node_id DESC LIMIT 1);'; - - -- Build lag log message - IF node_count > 1 THEN - lag_log := lag_log || ', '; - END IF; - lag_log := lag_log || node_rec.node_name || ' → newest lag: % (bytes: %)'; - - -- Build lag conditions - IF node_count > 1 THEN - lag_conditions := lag_conditions || E'\n AND '; - END IF; - lag_conditions := lag_conditions || 'lag_' || node_rec.node_name || ' IS NOT NULL'; - lag_conditions := lag_conditions || E'\n AND (extract(epoch FROM lag_' || node_rec.node_name || ') < 59 OR lag_' || node_rec.node_name || '_bytes = 0)'; - END LOOP; - - IF node_count <= 1 THEN - RAISE NOTICE '[STEP] Only one node found, skipping lag monitoring'; - RETURN; - END IF; - - -- ============================================================================ - -- Step 2: Build dynamic remote SQL for monitoring replication lag - -- ============================================================================ - -- Build COALESCE parameters for the log message - DECLARE - coalesce_params text := ''; - node_rec2 record; - BEGIN - FOR node_rec2 IN - SELECT * FROM dblink(node_dsn, 'SELECT node_name FROM spock.node ORDER BY node_id') - AS t(node_name text) - LOOP - IF coalesce_params != '' THEN - coalesce_params := coalesce_params || ', '; - END IF; - coalesce_params := coalesce_params || 'COALESCE(lag_' || node_rec2.node_name || '::text, ''NULL''), COALESCE(lag_' || node_rec2.node_name || '_bytes::text, ''NULL'')'; - END LOOP; - - remotesql := format($sql$ - DO ' - DECLARE%s - BEGIN - LOOP%s - - -- Log current lag values - RAISE NOTICE ''[MONITOR] %s'', - %s; - - -- Exit loop when all lags are below 59 seconds - EXIT WHEN %s; - - -- Sleep for 1 second before next check - PERFORM pg_sleep(1); - END LOOP; - END - '; - $sql$, - lag_vars, - lag_assignments, - lag_log, - coalesce_params, - lag_conditions - ); - END; - - IF verb THEN - RAISE NOTICE '[STEP] Generated monitoring SQL for % nodes: %', node_count, node_list; - RAISE NOTICE '[QUERY] %', remotesql; - END IF; - - -- ============================================================================ - -- Step 3: Execute remote monitoring SQL via dblink - -- ============================================================================ - IF verb THEN - RAISE NOTICE E'[STEP] monitor_replication_lag: Executing remote monitoring SQL on node: %', node_dsn; - END IF; - PERFORM dblink(node_dsn, remotesql); - - -- ============================================================================ - -- Step 4: Log completion of monitoring - -- ============================================================================ - IF verb THEN - RAISE NOTICE E'[STEP] monitor_replication_lag: Monitoring replication lag completed on remote node: %', node_dsn; - END IF; -END; -$$; - --- ============================================================================ --- Procedure to monitor replication lag between nodes --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.monitor_replication_lag_wait( - origin_node text, - receiver_node text, - max_lag_seconds integer DEFAULT 59, - check_interval_seconds integer DEFAULT 1, - verb boolean DEFAULT true -) -LANGUAGE plpgsql -AS $$ -DECLARE - lag_interval interval; - lag_bytes bigint; - start_time timestamp := now(); - elapsed_interval interval; -BEGIN - IF verb THEN - RAISE NOTICE 'Monitoring replication lag from % to % (max lag: % seconds)', - origin_node, receiver_node, max_lag_seconds; - END IF; - - LOOP - -- Get current lag time and bytes - SELECT now() - commit_timestamp, replication_lag_bytes - INTO lag_interval, lag_bytes - FROM spock.lag_tracker - WHERE origin_name = origin_node AND receiver_name = receiver_node; - - -- Calculate elapsed time - elapsed_interval := now() - start_time; - - IF verb THEN - RAISE NOTICE '% → % lag: % (bytes: %, elapsed: %)', - origin_node, receiver_node, - COALESCE(lag_interval::text, 'NULL'), - COALESCE(lag_bytes::text, 'NULL'), - elapsed_interval::text; - END IF; - - -- Exit when lag is within acceptable limits OR when lag_bytes is zero - EXIT WHEN lag_interval IS NOT NULL - AND (extract(epoch FROM lag_interval) < max_lag_seconds OR lag_bytes = 0); - - -- Sleep before next check - PERFORM pg_sleep(check_interval_seconds); - END LOOP; - - IF verb THEN - IF lag_bytes = 0 THEN - RAISE NOTICE 'Replication lag from % to % completed (lag_bytes = 0)', - origin_node, receiver_node; - ELSE - RAISE NOTICE 'Replication lag from % to % is now within acceptable limits (% seconds)', - origin_node, receiver_node, max_lag_seconds; - END IF; - END IF; -END; -$$; - --- ============================================================================ --- Procedure to monitor multiple replication paths simultaneously --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.monitor_multiple_replication_lags( - lag_configs jsonb, - max_lag_seconds integer DEFAULT 59, - check_interval_seconds integer DEFAULT 1, - verb boolean DEFAULT true -) -LANGUAGE plpgsql -AS $$ -DECLARE - lag_record record; - lag_interval interval; - all_within_limits boolean; - start_time timestamp := now(); - elapsed_interval interval; - config jsonb; - max_wait_seconds integer := 60; - wait_count integer := 0; - lag_data record; -BEGIN - IF verb THEN - RAISE NOTICE 'Monitoring multiple replication lags (max lag: % seconds, timeout: % seconds)', - max_lag_seconds, max_wait_seconds; - RAISE NOTICE 'Monitoring % paths', jsonb_array_length(lag_configs); - END IF; - - -- Wait for initial data to appear - WHILE NOT EXISTS (SELECT 1 FROM spock.lag_tracker LIMIT 1) AND wait_count < 10 LOOP - IF verb THEN - RAISE NOTICE 'Waiting for lag_tracker data to appear... (attempt %/10)', wait_count + 1; - END IF; - PERFORM pg_sleep(2); - wait_count := wait_count + 1; - END LOOP; - - IF NOT EXISTS (SELECT 1 FROM spock.lag_tracker LIMIT 1) THEN - RAISE NOTICE 'No lag_tracker data available after waiting - skipping lag monitoring'; - RETURN; - END IF; - - LOOP - all_within_limits := true; - - IF verb THEN - RAISE NOTICE 'Checking lag for % paths...', jsonb_array_length(lag_configs); - END IF; - - -- Check each replication path - FOR config IN SELECT * FROM jsonb_array_elements(lag_configs) - LOOP - IF verb THEN - RAISE NOTICE 'Checking path: % → %', config->>'origin', config->>'receiver'; - END IF; - - SELECT now() - commit_timestamp INTO lag_interval - FROM spock.lag_tracker - WHERE origin_name = config->>'origin' - AND receiver_name = config->>'receiver'; - - IF verb THEN - RAISE NOTICE '% → % lag: %', - config->>'origin', config->>'receiver', - COALESCE(lag_interval::text, 'NULL'); - END IF; - - -- Check if this path is within limits - IF lag_interval IS NULL OR extract(epoch FROM lag_interval) >= max_lag_seconds THEN - all_within_limits := false; - END IF; - END LOOP; - - -- Also show all available lag data for debugging - IF verb THEN - RAISE NOTICE 'All available lag data:'; - FOR lag_data IN SELECT origin_name, receiver_name, commit_timestamp, replication_lag FROM spock.lag_tracker LOOP - RAISE NOTICE ' % → %: commit_ts=%s, lag=%s', - lag_data.origin_name, lag_data.receiver_name, - lag_data.commit_timestamp, lag_data.replication_lag; - END LOOP; - END IF; - - -- Calculate elapsed time - elapsed_interval := now() - start_time; - - IF verb THEN - RAISE NOTICE 'All paths within limits: % (elapsed: %)', - all_within_limits, elapsed_interval::text; - END IF; - - -- Exit when all paths are within acceptable limits - EXIT WHEN all_within_limits; - - -- Exit if we've been waiting too long - IF extract(epoch FROM elapsed_interval) > max_wait_seconds THEN - IF verb THEN - RAISE NOTICE 'Timeout reached (% seconds) - exiting lag monitoring', max_wait_seconds; - END IF; - EXIT; - END IF; - - -- Sleep before next check - PERFORM pg_sleep(check_interval_seconds); - END LOOP; - - IF verb THEN - IF all_within_limits THEN - RAISE NOTICE 'All replication lags are now within acceptable limits (% seconds)', max_lag_seconds; - ELSE - RAISE NOTICE 'Lag monitoring completed with timeout - some paths may still have high lag'; - END IF; - END IF; -END; -$$; - --- ============================================================================ --- Example usage procedure (equivalent to the workflow logic) --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.wait_for_n3_sync( - max_lag_seconds integer DEFAULT 59, - check_interval_seconds integer DEFAULT 1, - verb boolean DEFAULT true -) -LANGUAGE plpgsql -AS $$ -DECLARE - lag_configs jsonb; -BEGIN - -- Define the replication paths to monitor - lag_configs := '[ - {"origin": "n1", "receiver": "n3"}, - {"origin": "n2", "receiver": "n3"} - ]'::jsonb; - - -- Monitor both paths - CALL spock.monitor_multiple_replication_lags( - lag_configs, - max_lag_seconds, - check_interval_seconds, - verb - ); -END; -$$; - --- ============================================================================ - -- ============================================================================ -- Procedure to monitor lag using dblink -- ============================================================================ @@ -1347,7 +867,7 @@ DECLARE lag_interval interval; lag_bytes bigint; max_wait_seconds integer := 60; - start_time timestamp := now(); + start_time timestamp := clock_timestamp(); elapsed_interval interval; loop_count integer := 0; lag_sql text; @@ -1368,7 +888,7 @@ BEGIN lag_interval := lag_result.lag_interval; lag_bytes := lag_result.lag_bytes; - elapsed_interval := now() - start_time; + elapsed_interval := clock_timestamp() - start_time; RAISE NOTICE '% → % lag: % (bytes: %, elapsed: %, loop: %)', src_node_name, new_node_name, @@ -1402,8 +922,32 @@ $$; -- ============================================================================ +-- +-- Utility routine to correctly extract database name from the DSN string. +-- +-- The purpose here is to centralise this specific logic: people may complain +-- about more flexibility in writing the DSN: using upper-case letters in +-- keywords, as an example. +-- +CREATE OR REPLACE FUNCTION spock.extract_dbname_from_dsn(dsn text) +RETURNS text AS $$ +DECLARE + dbname text; +BEGIN + dbname := substring(dsn from 'dbname=([^\s]+)'); + IF dbname IS NOT NULL THEN + dbname := TRIM(BOTH '''' FROM dbname); + END IF; + IF dbname IS NULL THEN + -- We can't rely on the PGDATABASE environment variable here. + -- Also, it seems unreliable to guess or use a default name. + -- So, complain. + RAISE EXCEPTION 'Exiting add_node: Database name must be explicitly included into the DSN string %', dsn; + END IF; - + RETURN dbname; +END; +$$ LANGUAGE plpgsql IMMUTABLE; -- ============================================================================ -- Procedure to verify prerequisites for adding a new node @@ -1429,14 +973,8 @@ BEGIN RAISE NOTICE 'Phase 1: Validating source and new node prerequisites'; -- Check if database specified in new_node_dsn exists on new node - new_db_name := substring(new_node_dsn from 'dbname=([^\s]+)'); - IF new_db_name IS NOT NULL THEN - new_db_name := TRIM(BOTH '''' FROM new_db_name); - END IF; - IF new_db_name IS NULL THEN - new_db_name := 'pgedge'; - END IF; + SELECT spock.extract_dbname_from_dsn(new_node_dsn) INTO new_db_name; BEGIN SELECT EXISTS(SELECT 1 FROM dblink(new_node_dsn, 'SELECT 1') AS t(dummy int)) INTO new_db_exists; RAISE NOTICE ' OK: %', rpad('Checking database ' || new_db_name || ' exists on new node', 120, ' '); @@ -1446,6 +984,23 @@ BEGIN RAISE EXCEPTION 'Exiting add_node: Database % does not exist on new node. Please create it first.', new_db_name; END; + -- Check if they previously installed lolor on the destination. + -- They should not have run CREATE EXTENSION yet + DECLARE + user_table_count integer; + remotesql text; + BEGIN + remotesql := 'SELECT count(*) FROM pg_tables WHERE schemaname = ''lolor'''; + SELECT * FROM dblink(new_node_dsn, remotesql) AS t(count integer) INTO user_table_count; + + IF user_table_count > 0 THEN + RAISE NOTICE ' [FAILED] %', rpad('Database ' || new_db_name || ' has the lolor extension installed or remaining lolor data.', 120, ' '); + RAISE EXCEPTION 'Exiting add_node: Database % has the lolor extension installed or remaining lolor user data.', new_db_name; + ELSE + RAISE NOTICE ' OK: %', rpad('Checking database ' || new_db_name || ' to ensure lolor is not installed', 120, ' '); + END IF; + END; + -- Check if database has user-created tables in user-created schemas DECLARE user_table_count integer; @@ -1496,6 +1051,40 @@ BEGIN END IF; END; + -- Check: all nodes, included in the cluster, have only enabled subscriptions. + -- + -- Connect to each node in the cluster and pass through the spock.subscription + -- table to check subscriptions statuses. Using it we try to avoid cases + -- when somewhere in the middle a crash or disconnection happens that may + -- be aggravated by add_node. + DECLARE + status_rec record; + dsn_rec record; + dsns_sql text; + sub_status_sql text; + BEGIN + dsns_sql := 'SELECT if_dsn,node_name + FROM spock.node JOIN spock.node_interface + ON (if_nodeid = node_id) + WHERE node_id NOT IN (SELECT node_id FROM spock.local_node)'; + sub_status_sql := 'SELECT sub_name, sub_enabled FROM spock.subscription'; + + FOR dsn_rec IN SELECT * FROM dblink(src_dsn, dsns_sql) + AS t(dsn text, node name) + LOOP + FOR status_rec IN SELECT * FROM dblink(dsn_rec.dsn, sub_status_sql) + AS t(name text, status text) + LOOP + IF status_rec.status != 't' THEN + RAISE EXCEPTION ' [FAILED] %', rpad('Node ' || dsn_rec.node || ' has disabled subscription ' || status_rec.name, 60, ' '); + ELSIF verb THEN + RAISE NOTICE ' OK: %', rpad('Node with DSN ' || dsn_rec.dsn || ' has enabled subscription ' || status_rec.name, 120, ' '); + END IF; + END LOOP; + END LOOP; + RAISE NOTICE ' OK: %', rpad('Checking each Spock node has only active subscriptions', 120, ' '); + END; + -- Validating new node prerequisites SELECT count(*) INTO new_exists FROM spock.node WHERE node_name = new_node_name; IF new_exists > 0 THEN @@ -1587,7 +1176,7 @@ BEGIN END IF; -- Extract dbname and handle both quoted and unquoted values - dbname := TRIM(BOTH '''' FROM substring(rec.dsn from 'dbname=([^\s]+)')); + SELECT spock.extract_dbname_from_dsn(rec.dsn) INTO dbname; -- Remove single quotes if present IF dbname IS NOT NULL THEN @@ -1640,6 +1229,7 @@ DECLARE dbname text; slot_name text; sub_name text; + _commit_lsn pg_lsn; BEGIN RAISE NOTICE 'Phase 3: Creating disabled subscriptions and slots'; @@ -1649,7 +1239,8 @@ BEGIN -- Create temporary table to store sync LSNs CREATE TEMP TABLE IF NOT EXISTS temp_sync_lsns ( origin_node text PRIMARY KEY, - sync_lsn text NOT NULL + sync_lsn text NOT NULL, + commit_lsn pg_lsn ); -- Check if there are any "other" nodes (not source, not new) @@ -1702,7 +1293,7 @@ BEGIN -- Create replication slot on the "other" node BEGIN -- Extract dbname and handle both quoted and unquoted values - dbname := TRIM(BOTH '''' FROM substring(rec.dsn from 'dbname=([^\s]+)')); + SELECT spock.extract_dbname_from_dsn(rec.dsn) INTO dbname; -- Remove single quotes if present IF dbname IS NOT NULL THEN @@ -1714,15 +1305,21 @@ BEGIN dbname := TRIM(BOTH '''' FROM dbname); END IF; IF dbname IS NULL THEN dbname := 'pgedge'; END IF; - slot_name := left('spk_' || dbname || '_' || rec.node_name || '_sub_' || rec.node_name || '_' || new_node_name, 64); + + slot_name := spock.spock_gen_slot_name( + dbname, rec.node_name, + 'sub_' || rec.node_name || '_' || new_node_name); remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'');', slot_name); IF verb THEN RAISE NOTICE ' Remote SQL for slot creation: %', remotesql; END IF; - PERFORM * FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn); - RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name, 120, ' '); + SELECT lsn INTO _commit_lsn + FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn); + UPDATE temp_sync_lsns SET commit_lsn = _commit_lsn + WHERE origin_node = rec.node_name; + RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' (LSN: ' || _commit_lsn || ')' || ' on node ' || rec.node_name, 120, ' '); EXCEPTION WHEN OTHERS THEN RAISE NOTICE ' ✗ %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); @@ -2107,7 +1704,7 @@ CREATE OR REPLACE PROCEDURE spock.check_commit_timestamp_and_advance_slot( ) LANGUAGE plpgsql AS $$ DECLARE rec RECORD; - commit_ts timestamp; + commit_lsn pg_lsn; slot_name text; dbname text; remotesql text; @@ -2122,32 +1719,30 @@ BEGIN -- Multi-node scenario: check commit timestamp for "other" nodes to new node FOR rec IN SELECT * FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name LOOP - -- Check commit timestamp for lag from "other" node to new node BEGIN - remotesql := format('SELECT commit_timestamp FROM spock.lag_tracker WHERE origin_name = %L AND receiver_name = %L', - rec.node_name, new_node_name); - IF verb THEN - RAISE NOTICE ' Remote SQL for commit timestamp check: %', remotesql; - END IF; - - SELECT * FROM dblink(new_node_dsn, remotesql) AS t(ts timestamp) INTO commit_ts; - - IF commit_ts IS NOT NULL THEN - RAISE NOTICE ' OK: %', rpad('Found commit timestamp for ' || rec.node_name || '->' || new_node_name || ': ' || commit_ts, 120, ' '); - ELSE - RAISE NOTICE ' - %', rpad('No commit timestamp found for ' || rec.node_name || '->' || new_node_name, 120, ' '); - CONTINUE; + IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'temp_sync_lsns' AND relpersistence = 't') THEN + -- Get the stored sync LSN from when subscription was created + SELECT tsl.commit_lsn INTO commit_lsn + FROM temp_sync_lsns tsl + WHERE tsl.origin_node = rec.node_name; + + IF commit_lsn IS NOT NULL THEN + RAISE NOTICE ' OK: %', rpad('Found commit LSN for ' || rec.node_name || ' (LSN: ' || commit_lsn || ')...', 120, ' '); + ELSE + RAISE NOTICE ' - %', rpad('No commit LSN found for ' || rec.node_name || '->' || new_node_name, 120, ' '); + CONTINUE; + END IF; END IF; EXCEPTION WHEN OTHERS THEN - RAISE NOTICE ' ✗ %', rpad('Checking commit timestamp for ' || rec.node_name || '->' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); + RAISE NOTICE ' ✗ %', rpad('Checking commit LSN for ' || rec.node_name || '->' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); CONTINUE; END; -- Advance replication slot based on commit timestamp BEGIN -- Extract dbname and handle both quoted and unquoted values - dbname := TRIM(BOTH '''' FROM substring(rec.dsn from 'dbname=([^\s]+)')); + SELECT spock.extract_dbname_from_dsn(rec.dsn) INTO dbname; -- Remove single quotes if present IF dbname IS NOT NULL THEN @@ -2179,14 +1774,7 @@ BEGIN CONTINUE; END IF; - -- Get target LSN from commit timestamp - remotesql := format('SELECT spock.get_lsn_from_commit_ts(%L, %L::timestamp)', slot_name, commit_ts); - IF verb THEN - RAISE NOTICE ' Remote SQL for LSN lookup: %', remotesql; - END IF; - - SELECT * FROM dblink(rec.dsn, remotesql) AS t(lsn pg_lsn) INTO target_lsn; - + target_lsn := commit_lsn; IF target_lsn IS NULL OR target_lsn <= current_lsn THEN RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, target_lsn; CONTINUE; @@ -2203,43 +1791,13 @@ BEGIN END; EXCEPTION WHEN OTHERS THEN - RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to timestamp ' || commit_ts || ' (error: ' || SQLERRM || ')', 120, ' '); + RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to LSN ' || commit_lsn || ' (error: ' || SQLERRM || ')', 120, ' '); -- Continue with other nodes even if this one fails END; END LOOP; END; $$; --- ============================================================================ --- Simple procedure to check lag between specific nodes (simplified) --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.check_node_lag( - origin_node text, - receiver_node text, - verb boolean DEFAULT true, - INOUT lag_interval interval DEFAULT NULL -) -LANGUAGE plpgsql -AS $$ -BEGIN - -- Check for the specific path - SELECT now() - commit_timestamp INTO lag_interval - FROM spock.lag_tracker - WHERE origin_name = origin_node AND receiver_name = receiver_node; - - IF lag_interval IS NOT NULL THEN - IF verb THEN - RAISE NOTICE '% → % lag: %', origin_node, receiver_node, lag_interval; - END IF; - ELSE - IF verb THEN - RAISE NOTICE '% → % lag: NULL (no data)', origin_node, receiver_node; - END IF; - END IF; -END; -$$; - -- ============================================================================ -- Procedure to trigger sync on source node and wait for it on new node using sync_event and wait_for_sync_event -- ============================================================================ @@ -2282,8 +1840,7 @@ BEGIN RAISE NOTICE ' OK: %', rpad('Waiting for sync event from ' || src_node_name || ' on new node ' || new_node_name || '...', 120, ' '); EXCEPTION WHEN OTHERS THEN - RAISE NOTICE ' ✗ %', rpad('Unable to wait for sync event from ' || src_node_name || ' on new node ' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); - RAISE; + RAISE EXCEPTION ' ✗ %', rpad('Unable to wait for sync event from ' || src_node_name || ' on new node ' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); END; END; $$; @@ -2478,3 +2035,260 @@ BEGIN END; $$; +-- ============================================================================ +-- Procedure: health_check +-- Purpose : Validate cluster health before or after ZODAN node addition +-- Similar to pg_upgrade -c (check) option +-- Arguments: +-- src_node_name - Source node name +-- src_dsn - Source node DSN +-- new_node_name - New node name (optional for cluster-wide check) +-- new_node_dsn - New node DSN (optional for cluster-wide check) +-- check_type - Check type: 'pre' (before add_node) or 'post' (after add_node) +-- verb - Verbose output flag +-- Usage : +-- -- Pre-check before adding a node +-- CALL spock.health_check('n1', 'host=localhost dbname=pgedge port=5431 user=pgedge password=pgedge', +-- 'n3', 'host=localhost dbname=pgedge port=5433 user=pgedge password=pgedge', +-- 'pre', true); +-- +-- -- Post-check after adding a node +-- CALL spock.health_check('n1', 'host=localhost dbname=pgedge port=5431 user=pgedge password=pgedge', +-- 'n3', 'host=localhost dbname=pgedge port=5433 user=pgedge password=pgedge', +-- 'post', true); +-- +-- -- Cluster-wide check (no new node) +-- CALL spock.health_check('n1', 'host=localhost dbname=pgedge port=5431 user=pgedge password=pgedge', +-- NULL, NULL, 'pre', true); +-- ============================================================================ +CREATE OR REPLACE PROCEDURE spock.health_check( + src_node_name text, + src_dsn text, + new_node_name text DEFAULT NULL, + new_node_dsn text DEFAULT NULL, + check_type text DEFAULT 'pre', + verb boolean DEFAULT false +) LANGUAGE plpgsql AS $$ +DECLARE + checks_passed integer := 0; + checks_failed integer := 0; + check_result text; + node_rec RECORD; + remotesql text; + result_value text; + result_count integer; + src_version text; + new_version text; + sub_count text; + user_table_count text; +BEGIN + RAISE NOTICE ''; + RAISE NOTICE '================================================================================'; + RAISE NOTICE 'ZODAN CLUSTER HEALTH CHECK (%-CHECK)', upper(check_type); + RAISE NOTICE '================================================================================'; + + -- ======================================================================== + -- Check 1: Spock version compatibility (if new node provided) + -- ======================================================================== + IF new_node_dsn IS NOT NULL THEN + BEGIN + -- Call existing check_spock_version_compatibility procedure + CALL spock.check_spock_version_compatibility(src_dsn, new_node_dsn, false); + RAISE NOTICE 'PASS: Spock version compatibility check'; + checks_passed := checks_passed + 1; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Spock version compatibility - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + END IF; + + -- ======================================================================== + -- Check 2: Node connectivity - Source node + -- ======================================================================== + BEGIN + remotesql := 'SELECT 1'; + SELECT * FROM dblink(src_dsn, remotesql) AS t(result integer) INTO result_value; + RAISE NOTICE 'PASS: Source node % connectivity', src_node_name; + checks_passed := checks_passed + 1; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Source node % connectivity - %', src_node_name, SQLERRM; + checks_failed := checks_failed + 1; + END; + + -- ======================================================================== + -- Check 3: Node connectivity - New node (if provided) + -- ======================================================================== + IF new_node_dsn IS NOT NULL THEN + BEGIN + remotesql := 'SELECT 1'; + SELECT * FROM dblink(new_node_dsn, remotesql) AS t(result integer) INTO result_value; + RAISE NOTICE 'PASS: New node % connectivity', new_node_name; + checks_passed := checks_passed + 1; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: New node % connectivity - %', new_node_name, SQLERRM; + checks_failed := checks_failed + 1; + END; + END IF; + + -- ======================================================================== + -- Check 4: Spock extension installation - Source node + -- ======================================================================== + BEGIN + remotesql := 'SELECT extversion FROM pg_extension WHERE extname = ''spock'''; + SELECT * FROM dblink(src_dsn, remotesql) AS t(version text) INTO src_version; + IF src_version IS NOT NULL THEN + RAISE NOTICE 'PASS: Spock extension on source node (version %)', src_version; + checks_passed := checks_passed + 1; + ELSE + RAISE NOTICE 'FAIL: Spock extension not installed on source node'; + checks_failed := checks_failed + 1; + END IF; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Spock extension check on source - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + + -- ======================================================================== + -- Check 5: Spock extension installation - New node (if provided) + -- ======================================================================== + IF new_node_dsn IS NOT NULL THEN + BEGIN + remotesql := 'SELECT extversion FROM pg_extension WHERE extname = ''spock'''; + SELECT * FROM dblink(new_node_dsn, remotesql) AS t(version text) INTO new_version; + IF new_version IS NOT NULL THEN + RAISE NOTICE 'PASS: Spock extension on new node (version %)', new_version; + checks_passed := checks_passed + 1; + ELSE + RAISE NOTICE 'FAIL: Spock extension not installed on new node'; + checks_failed := checks_failed + 1; + END IF; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Spock extension check on new node - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + END IF; + + -- ======================================================================== + -- Check 6: Cluster node enumeration + -- ======================================================================== + BEGIN + remotesql := 'SELECT count(*) FROM spock.node'; + SELECT * FROM dblink(src_dsn, remotesql) AS t(node_count integer) INTO result_count; + RAISE NOTICE 'PASS: Cluster has % nodes', result_count; + checks_passed := checks_passed + 1; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Cluster node enumeration - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + + -- ======================================================================== + -- Check 7: Active subscriptions on each node + -- ======================================================================== + BEGIN + FOR node_rec IN + SELECT node_name, if_dsn + FROM dblink(src_dsn, + 'SELECT n.node_name, i.if_dsn FROM spock.node n JOIN spock.node_interface i ON n.node_id = i.if_nodeid' + ) AS t(node_name text, if_dsn text) + LOOP + BEGIN + remotesql := 'SELECT count(*) FROM spock.subscription WHERE sub_enabled = true'; + SELECT * FROM dblink(node_rec.if_dsn, remotesql) AS t(sub_count text) INTO sub_count; + RAISE NOTICE 'PASS: Node % has % active subscriptions', node_rec.node_name, COALESCE(sub_count, '0'); + checks_passed := checks_passed + 1; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Node % subscription check - %', node_rec.node_name, SQLERRM; + checks_failed := checks_failed + 1; + END; + END LOOP; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Cluster subscription check - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + + -- ======================================================================== + -- Check 8: Database prerequisites (pre-check only) + -- ======================================================================== + IF check_type = 'pre' AND new_node_dsn IS NOT NULL THEN + -- Check 8a: Verify lolor extension is not installed + BEGIN + remotesql := 'SELECT count(*) FROM pg_tables WHERE schemaname = ''lolor'''; + SELECT * FROM dblink(new_node_dsn, remotesql) AS t(table_count text) INTO user_table_count; + + IF user_table_count IS NOT NULL AND user_table_count::integer = 0 THEN + RAISE NOTICE 'PASS: Destination database does not have signs of lolor being installed'; + checks_passed := checks_passed + 1; + ELSE + RAISE NOTICE 'FAIL: Destination database has the lolor extension installed or remaining lolor user data in the lolor schema'; + checks_failed := checks_failed + 1; + END IF; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: lolor extension check - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + + -- Check 8b: Verify database is empty (no user tables) + BEGIN + remotesql := $pg_tables$ + SELECT count(*) FROM pg_tables + WHERE schemaname NOT IN ('information_schema', 'pg_catalog', 'pg_toast', 'spock') + AND schemaname NOT LIKE 'pg_temp_%' + AND schemaname NOT LIKE 'pg_toast_temp_%' + $pg_tables$; + SELECT * FROM dblink(new_node_dsn, remotesql) AS t(table_count text) INTO user_table_count; + + IF user_table_count IS NOT NULL AND user_table_count::integer = 0 THEN + RAISE NOTICE 'PASS: New node database is empty (fresh database)'; + checks_passed := checks_passed + 1; + ELSE + RAISE NOTICE 'FAIL: New node database has % user-created tables', user_table_count; + checks_failed := checks_failed + 1; + END IF; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Database emptiness check - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + END IF; + + -- ======================================================================== + -- Check 9: Replication status (post-check only) + -- ======================================================================== + IF check_type = 'post' AND new_node_name IS NOT NULL AND new_node_dsn IS NOT NULL THEN + BEGIN + remotesql := 'SELECT count(*) FROM spock.subscription WHERE sub_enabled = true'; + SELECT * FROM dblink(new_node_dsn, remotesql) AS t(sub_count text) INTO sub_count; + + IF sub_count IS NOT NULL AND sub_count::integer > 0 THEN + RAISE NOTICE 'PASS: New node % has active subscriptions', new_node_name; + checks_passed := checks_passed + 1; + ELSE + RAISE NOTICE 'FAIL: New node % has no active subscriptions', new_node_name; + checks_failed := checks_failed + 1; + END IF; + EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'FAIL: Post-addition replication check - %', SQLERRM; + checks_failed := checks_failed + 1; + END; + END IF; + + -- ======================================================================== + -- Summary + -- ======================================================================== + RAISE NOTICE ''; + RAISE NOTICE '================================================================================'; + RAISE NOTICE 'HEALTH CHECK SUMMARY'; + RAISE NOTICE '================================================================================'; + RAISE NOTICE 'Checks Passed: %', checks_passed; + RAISE NOTICE 'Checks Failed: %', checks_failed; + RAISE NOTICE 'Total Checks: %', checks_passed + checks_failed; + RAISE NOTICE ''; + + IF checks_failed > 0 THEN + RAISE NOTICE 'RESULT: FAILED - Please resolve issues before proceeding'; + RAISE EXCEPTION 'Health check failed with % failed checks', checks_failed; + ELSE + RAISE NOTICE 'RESULT: PASSED - Cluster is ready for ZODAN node addition'; + END IF; +END; +$$; + diff --git a/samples/Z0DAN/zodan_cleanup.sql b/samples/Z0DAN/zodan_cleanup.sql index c92416b1..bd20d072 100644 --- a/samples/Z0DAN/zodan_cleanup.sql +++ b/samples/Z0DAN/zodan_cleanup.sql @@ -1,7 +1,7 @@ -- ============================================================================ -- ZODAN Cleanup Script -- Purpose: Drops everything created by zodan.sql --- +-- -- Note: Updated to match the corrected zodan.sql file with proper phase numbering -- and all duplicate definitions removed. -- ============================================================================ @@ -12,14 +12,8 @@ DROP PROCEDURE IF EXISTS spock.create_sub(text, text, text, text, boolean, boole DROP PROCEDURE IF EXISTS spock.create_replication_slot(text, text, boolean, text); DROP PROCEDURE IF EXISTS spock.sync_event(text, boolean, pg_lsn); DROP PROCEDURE IF EXISTS spock.create_node(text, text, boolean, text, text, jsonb); -DROP PROCEDURE IF EXISTS spock.get_commit_timestamp(text, text, text, boolean, timestamp); -DROP PROCEDURE IF EXISTS spock.advance_replication_slot(text, text, timestamp, boolean); DROP PROCEDURE IF EXISTS spock.enable_sub(text, text, boolean, boolean); -DROP PROCEDURE IF EXISTS spock.monitor_replication_lag(text, boolean); DROP PROCEDURE IF EXISTS spock.monitor_replication_lag(text, text, text, boolean); -DROP PROCEDURE IF EXISTS spock.monitor_replication_lag_wait(text, text, integer, integer, boolean); -DROP PROCEDURE IF EXISTS spock.monitor_multiple_replication_lags(jsonb, integer, integer, boolean); -DROP PROCEDURE IF EXISTS spock.wait_for_n3_sync(integer, integer, boolean); DROP PROCEDURE IF EXISTS spock.monitor_lag_with_dblink(text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.verify_node_prerequisites(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.create_nodes_only(text, text, text, text, boolean, text, text, jsonb, integer); @@ -31,7 +25,6 @@ DROP PROCEDURE IF EXISTS spock.create_new_to_source_subscription(text, text, tex DROP PROCEDURE IF EXISTS spock.create_source_to_new_subscription(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.trigger_sync_on_other_nodes_and_wait_on_source(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.check_commit_timestamp_and_advance_slot(text, text, text, text, boolean); -DROP PROCEDURE IF EXISTS spock.check_node_lag(text, text, boolean, interval); DROP PROCEDURE IF EXISTS spock.present_final_cluster_state(integer, boolean); DROP PROCEDURE IF EXISTS spock.add_node(text, text, text, text, boolean, text, text, jsonb); @@ -43,10 +36,10 @@ DO $$ BEGIN -- Drop any remaining temporary tables that might have been created DROP TABLE IF EXISTS temp_spock_nodes CASCADE; - + -- Clean up any temporary schemas or objects -- (This is a safety measure in case any temporary objects were created) - + RAISE NOTICE 'ZODAN cleanup completed successfully'; EXCEPTION WHEN OTHERS THEN diff --git a/sql/spock--5.0.4--5.0.5.sql b/sql/spock--5.0.4--5.0.5.sql new file mode 100644 index 00000000..3ec3dc94 --- /dev/null +++ b/sql/spock--5.0.4--5.0.5.sql @@ -0,0 +1,98 @@ +/* spock--5.0.4--5.0.5.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION spock UPDATE TO '5.0.5'" to load this file. \quit +\echo Use "CREATE EXTENSION spock" to load this file. \quit + +DROP PROCEDURE IF EXISTS spock.wait_for_sync_event(oid, pg_lsn, int); + +CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin_id oid, lsn pg_lsn, timeout int DEFAULT 0) +AS $$ +DECLARE + target_id oid; + elapsed_time numeric := 0; + progress_lsn pg_lsn; +BEGIN + IF origin_id IS NULL THEN + RAISE EXCEPTION 'Origin node ''%'' not found', origin; + END IF; + target_id := node_id FROM spock.node_info(); + + WHILE true LOOP + -- If an unresolvable issue occurs with the apply worker, the LR + -- progress gets stuck, and we need to check the subscription's state + -- carefully. + IF NOT EXISTS (SELECT * FROM spock.subscription + WHERE sub_origin = origin_id AND + sub_target = target_id AND + sub_enabled = true) THEN + RAISE EXCEPTION 'Replication % => % does not have any enabled subscription yet', + origin_id, target_id; + END IF; + + SELECT INTO progress_lsn remote_lsn + FROM spock.progress + WHERE node_id = target_id AND remote_node_id = origin_id; + IF progress_lsn >= lsn THEN + result = true; + RETURN; + END IF; + elapsed_time := elapsed_time + .2; + IF timeout <> 0 AND elapsed_time >= timeout THEN + result := false; + RETURN; + END IF; + + ROLLBACK; + PERFORM pg_sleep(0.2); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +DROP PROCEDURE IF EXISTS spock.wait_for_sync_event(name, pg_lsn, int); + +CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin name, lsn pg_lsn, timeout int DEFAULT 0) +AS $$ +DECLARE + origin_id oid; + target_id oid; + elapsed_time numeric := 0; + progress_lsn pg_lsn; +BEGIN + origin_id := node_id FROM spock.node WHERE node_name = origin; + IF origin_id IS NULL THEN + RAISE EXCEPTION 'Origin node ''%'' not found', origin; + END IF; + target_id := node_id FROM spock.node_info(); + + WHILE true LOOP + -- If an unresolvable issue occurs with the apply worker, the LR + -- progress gets stuck, and we need to check the subscription's state + -- carefully. + IF NOT EXISTS (SELECT * FROM spock.subscription + WHERE sub_origin = origin_id AND + sub_target = target_id AND + sub_enabled = true) THEN + RAISE EXCEPTION 'Replication % => % does not have any enabled subscription yet', + origin_id, target_id; + END IF; + + SELECT INTO progress_lsn remote_lsn + FROM spock.progress + WHERE node_id = target_id AND remote_node_id = origin_id; + IF progress_lsn >= lsn THEN + result = true; + RETURN; + END IF; + elapsed_time := elapsed_time + .2; + IF timeout <> 0 AND elapsed_time >= timeout THEN + result := false; + RETURN; + END IF; + + ROLLBACK; + PERFORM pg_sleep(0.2); + END LOOP; +END; +$$ LANGUAGE plpgsql; + diff --git a/src/spock_conflict.c b/src/spock_conflict.c index 848333d0..3bd71f4b 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -685,6 +685,7 @@ spock_report_conflict(SpockConflictType conflict_type, Oid conflict_idx_oid) { char local_tup_ts_str[MAXDATELEN] = "(unset)"; + char local_origin_str[32]; StringInfoData localtup, remotetup; TupleDesc desc = RelationGetDescr(rel->rel); @@ -720,10 +721,16 @@ spock_report_conflict(SpockConflictType conflict_type, conflict_idx_oid); memset(local_tup_ts_str, 0, MAXDATELEN); + strlcpy(local_origin_str, "unknown", sizeof(local_origin_str)); if (found_local_origin) + { strlcpy(local_tup_ts_str, timestamptz_to_str(local_tuple_commit_ts), MAXDATELEN); + if (local_tuple_origin != InvalidRepOriginId) + snprintf(local_origin_str, sizeof(local_origin_str), "%u", + (unsigned int) local_tuple_origin); + } initStringInfo(&remotetup); tuple_to_stringinfo(&remotetup, desc, remotetuple); @@ -762,9 +769,9 @@ spock_report_conflict(SpockConflictType conflict_type, conflict_type == CONFLICT_INSERT_EXISTS ? "INSERT EXISTS" : "UPDATE", qualrelname, idxname, conflict_resolution_to_string(resolution)), - errdetail("existing local tuple {%s} xid=%u,origin=%d,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X", + errdetail("existing local tuple {%s} xid=%u,origin=%s,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X", localtup.data, local_tuple_xid, - found_local_origin ? (int) local_tuple_origin : -1, + local_origin_str, local_tup_ts_str, remotetup.data, replorigin_session_origin, @@ -878,7 +885,10 @@ spock_conflict_log_table(SpockConflictType conflict_type, /* conflict_resolution */ values[6] = CStringGetTextDatum(conflict_resolution_to_string(resolution)); /* local_origin */ - values[7] = Int32GetDatum(found_local_origin ? (int) local_tuple_origin : -1); + if (found_local_origin && local_tuple_origin != InvalidRepOriginId) + values[7] = Int32GetDatum((int) local_tuple_origin); + else + nulls[7] = true; /* local_tuple */ if (localtuple != NULL) @@ -886,7 +896,7 @@ spock_conflict_log_table(SpockConflictType conflict_type, Datum datum; datum = heap_copy_tuple_as_datum(localtuple, desc); - values[8] = spock_conflict_row_to_json(datum, false, &nulls[7]); + values[8] = spock_conflict_row_to_json(datum, false, &nulls[8]); } else nulls[8] = true; @@ -913,7 +923,7 @@ spock_conflict_log_table(SpockConflictType conflict_type, Datum datum; datum = heap_copy_tuple_as_datum(remotetuple, desc); - values[12] = spock_conflict_row_to_json(datum, false, &nulls[11]); + values[12] = spock_conflict_row_to_json(datum, false, &nulls[12]); } else nulls[12] = true; diff --git a/src/spock_sync.c b/src/spock_sync.c index 60c62222..cb3f2737 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -395,14 +395,18 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) " last_updated_ts, updated_by_decode " "FROM spock.progress " "WHERE node_id = %u AND remote_node_id <> %u"; - const char *updateQuery = - "UPDATE spock.progress SET " - " remote_commit_ts = %s, " - " remote_lsn = %s, " - " remote_insert_lsn = %s, " - " last_updated_ts = %s, " - " updated_by_decode = %s " - "WHERE node_id = '%d' AND remote_node_id = '%d'"; + const char *upsertQuery = + "INSERT INTO spock.progress " + " (node_id, remote_node_id, remote_commit_ts, " + " remote_lsn, remote_insert_lsn, " + " last_updated_ts, updated_by_decode) " + "VALUES (%u, %s, %s, %s, %s, %s, %s) " + "ON CONFLICT (node_id, remote_node_id) DO UPDATE SET " + " remote_commit_ts = EXCLUDED.remote_commit_ts, " + " remote_lsn = EXCLUDED.remote_lsn, " + " remote_insert_lsn = EXCLUDED.remote_insert_lsn, " + " last_updated_ts = EXCLUDED.last_updated_ts, " + " updated_by_decode = EXCLUDED.updated_by_decode"; StringInfoData query; PGresult *originRes; @@ -428,12 +432,8 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) for (rno = 0; rno < PQntuples(originRes); rno++) { /* - * Update the remote node's progress entry to what our + * Upsert the remote node's progress entry to what our * sync provider has included in the COPY snapshot. - * - * We assume here that the progress table entry already - * exists. Turning this into an INSERT if not should be - * easy. */ char *remote_node_id = PQgetvalue(originRes, rno, 1); char *remote_commit_ts = PQgetvalue(originRes, rno, 2); @@ -443,7 +443,9 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) char *updated_by_decode = PQgetvalue(originRes, rno, 6); resetStringInfo(&query); - appendStringInfo(&query, updateQuery, + appendStringInfo(&query, upsertQuery, + MySubscription->target->id, + remote_node_id, PQescapeLiteral(target_conn, remote_commit_ts, strlen(remote_commit_ts)), PQescapeLiteral(target_conn, remote_lsn, @@ -453,9 +455,7 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) PQescapeLiteral(target_conn, last_updated_ts, strlen(last_updated_ts)), PQescapeLiteral(target_conn, updated_by_decode, - strlen(updated_by_decode)), - MySubscription->target->id, - MySubscription->origin->id); + strnlen(updated_by_decode, 64))); updateRes = PQexec(target_conn, query.data); if (PQresultStatus(updateRes) != PGRES_COMMAND_OK) diff --git a/src/spock_worker.c b/src/spock_worker.c index 2673d88b..75ba215f 100644 --- a/src/spock_worker.c +++ b/src/spock_worker.c @@ -343,8 +343,6 @@ spock_worker_attach(int slot, SpockWorkerType type) /* Now safe to process signals */ BackgroundWorkerUnblockSignals(); - MyProcPort = (Port *) calloc(1, sizeof(Port)); - LWLockAcquire(SpockCtx->lock, LW_EXCLUSIVE); before_shmem_exit(spock_worker_on_exit, (Datum) 0); @@ -386,6 +384,12 @@ spock_worker_attach(int slot, SpockWorkerType type) ,0 /* flags */ ); + /* + * Allocate MyProcPort after BackgroundWorkerInitializeConnectionByOid + * so that InitPostgres doesn't see a non-NULL MyProcPort and try to + * inspect SSL/GSS state on our fake Port (which would segfault). + */ + MyProcPort = (Port *) calloc(1, sizeof(Port)); StartTransactionCommand(); oldcontext = MemoryContextSwitchTo(TopMemoryContext); diff --git a/tests/regress/expected/init.out b/tests/regress/expected/init.out index c4720560..4d4757e8 100644 --- a/tests/regress/expected/init.out +++ b/tests/regress/expected/init.out @@ -49,7 +49,7 @@ CREATE EXTENSION IF NOT EXISTS spock; List of installed extensions Name | Version | Schema | Description -------+---------+--------+-------------------------------- - spock | 5.0.4 | spock | PostgreSQL Logical Replication + spock | 5.0.5 | spock | PostgreSQL Logical Replication (1 row) SELECT * FROM spock.node_create(node_name := 'test_provider', dsn := (SELECT provider_dsn FROM spock_regress_variables()) || ' user=super'); diff --git a/tests/regress/expected/init_1.out b/tests/regress/expected/init_1.out index 98f0873c..b419010c 100644 --- a/tests/regress/expected/init_1.out +++ b/tests/regress/expected/init_1.out @@ -49,7 +49,7 @@ CREATE EXTENSION IF NOT EXISTS spock; List of installed extensions Name | Version | Default version | Schema | Description -------+---------+-----------------+--------+-------------------------------- - spock | 5.0.4 | 5.0.4 | spock | PostgreSQL Logical Replication + spock | 5.0.5 | 5.0.5 | spock | PostgreSQL Logical Replication (1 row) SELECT * FROM spock.node_create(node_name := 'test_provider', dsn := (SELECT provider_dsn FROM spock_regress_variables()) || ' user=super'); diff --git a/tests/regress/expected/progress_tracking.out b/tests/regress/expected/progress_tracking.out new file mode 100644 index 00000000..71bbeffe --- /dev/null +++ b/tests/regress/expected/progress_tracking.out @@ -0,0 +1,351 @@ +-- +-- Test that spock.progress is properly maintained during sync, +-- specifically that adjust_progress_info() correctly upserts +-- forwarding entries in a 3-node cascade scenario. +-- +-- Topology: orig_provider (sourcedb) -> provider (regression) -> subscriber (postgres) +-- +-- The key behavior under test: when subscriber resyncs a table with provider, +-- adjust_progress_info() copies progress entries from provider's progress +-- table into subscriber's progress table for all remote_node_ids other than +-- subscriber itself. With the old UPDATE-only code, this would fail when +-- the subscriber had no pre-existing entry for orig_provider. The new +-- UPSERT code correctly INSERTs the forwarding entry. +-- +SELECT * FROM spock_regress_variables() +\gset +-- ============================================================ +-- Record subscriber's initial progress state (before cascade) +-- ============================================================ +\c :subscriber_dsn +SELECT count(*) AS initial_progress_count +FROM spock.progress +WHERE node_id = (SELECT node_id FROM spock.local_node); + initial_progress_count +------------------------ + 1 +(1 row) + +-- ============================================================ +-- Set up the orig_provider node (top of the cascade) +-- ============================================================ +\c :orig_provider_dsn +SET client_min_messages = 'warning'; +GRANT ALL ON SCHEMA public TO nonsuper; +DO $$ +BEGIN + IF (SELECT setting::integer/100 FROM pg_settings WHERE name = 'server_version_num') = 904 THEN + CREATE EXTENSION IF NOT EXISTS spock_origin; + END IF; +END;$$; +DO $$ +BEGIN + CREATE EXTENSION IF NOT EXISTS spock; +END; +$$; +ALTER EXTENSION spock UPDATE; +-- Suppress the OID output by wrapping in a DO block +DO $$ +BEGIN + PERFORM spock.node_create(node_name := 'test_orig_provider', + dsn := (SELECT orig_provider_dsn FROM spock_regress_variables()) || ' user=super'); +END; +$$; +-- ============================================================ +-- Subscribe provider to orig_provider +-- ============================================================ +\c :provider_dsn +SET client_min_messages = 'warning'; +BEGIN; +-- Suppress OID output +DO $$ +BEGIN + PERFORM spock.sub_create( + subscription_name := 'test_orig_subscription', + provider_dsn := (SELECT orig_provider_dsn FROM spock_regress_variables()) || ' user=super', + synchronize_structure := false, + synchronize_data := true, + forward_origins := '{}'); +END; +$$; +COMMIT; +BEGIN; +SET LOCAL statement_timeout = '10s'; +SELECT spock.sub_wait_for_sync('test_orig_subscription'); + sub_wait_for_sync +------------------- + +(1 row) + +COMMIT; +SELECT subscription_name, status, provider_node FROM spock.sub_show_status(); + subscription_name | status | provider_node +------------------------+-------------+-------------------- + test_orig_subscription | replicating | test_orig_provider +(1 row) + +-- ============================================================ +-- Create a table on orig_provider and replicate data to provider. +-- This ensures the provider has a non-trivial progress entry for +-- orig_provider (with real LSN and timestamp values). +-- Note: subscriber won't get this data due to forward_origins='{}', +-- but that's fine -- we just need the progress entry on the provider. +-- ============================================================ +\c :orig_provider_dsn +SELECT spock.replicate_ddl($$ + CREATE TABLE public.orig_data_tbl ( + id serial primary key, + data text + ); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'orig_data_tbl'); + repset_add_table +------------------ + t +(1 row) + +INSERT INTO orig_data_tbl(data) VALUES ('from_orig_1'), ('from_orig_2'), ('from_orig_3'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +-- Verify provider received the data from orig_provider +\c :provider_dsn +SELECT id, data FROM orig_data_tbl ORDER BY id; + id | data +----+------------- + 1 | from_orig_1 + 2 | from_orig_2 + 3 | from_orig_3 +(3 rows) + +-- Verify provider now has a progress entry for orig_provider +-- with a real (non-zero) LSN and a valid timestamp +SELECT + (p.remote_lsn <> '0/0') AS has_nonzero_lsn, + (p.remote_commit_ts > 'epoch'::timestamptz) AS has_valid_timestamp +FROM spock.progress p +JOIN spock.node_interface n ON n.if_nodeid = p.remote_node_id +WHERE p.node_id = (SELECT node_id FROM spock.local_node) + AND n.if_name = 'test_orig_provider'; + has_nonzero_lsn | has_valid_timestamp +-----------------+--------------------- + t | t +(1 row) + +-- ============================================================ +-- Create a table on the PROVIDER and add it to the default repset. +-- This table will replicate to the subscriber, giving us something +-- to resync that triggers adjust_progress_info(). +-- ============================================================ +SELECT spock.replicate_ddl($$ + CREATE TABLE public.resync_target_tbl ( + id serial primary key, + data text + ); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'resync_target_tbl'); + repset_add_table +------------------ + t +(1 row) + +INSERT INTO resync_target_tbl(data) VALUES ('prov1'), ('prov2'), ('prov3'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +-- Verify subscriber received the data +\c :subscriber_dsn +DO $$ +BEGIN + FOR i IN 1..100 LOOP + IF EXISTS ( + SELECT 1 FROM spock.local_sync_status + WHERE sync_relname = 'resync_target_tbl' + AND sync_status IN ('y', 'r') + ) THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + END LOOP; +END; +$$; +SELECT id, data FROM resync_target_tbl ORDER BY id; + id | data +----+------- + 1 | prov1 + 2 | prov2 + 3 | prov3 +(3 rows) + +-- ============================================================ +-- Check subscriber's progress BEFORE table resync. +-- The initial sync of resync_target_tbl may have already +-- triggered adjust_progress_info. Check that we have at least +-- the basic entry for the direct subscription. +-- ============================================================ +SELECT count(*) >= 1 AS has_progress_entries +FROM spock.progress +WHERE node_id = (SELECT node_id FROM spock.local_node); + has_progress_entries +---------------------- + t +(1 row) + +-- ============================================================ +-- Now trigger a table resync on subscriber. This calls +-- adjust_progress_info() which should UPSERT the forwarding +-- entry for orig_provider into subscriber's progress table. +-- ============================================================ +SELECT * FROM spock.sub_resync_table('test_subscription', 'resync_target_tbl'); + sub_resync_table +------------------ + t +(1 row) + +-- Wait for resync to complete +DO $$ +BEGIN + FOR i IN 1..200 LOOP + IF EXISTS ( + SELECT 1 FROM spock.local_sync_status + WHERE sync_relname = 'resync_target_tbl' + AND sync_status IN ('y', 'r') + ) THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + END LOOP; +END; +$$; +-- Verify the data is intact after resync +SELECT id, data FROM resync_target_tbl ORDER BY id; + id | data +----+------- + 1 | prov1 + 2 | prov2 + 3 | prov3 +(3 rows) + +-- ============================================================ +-- KEY ASSERTIONS: verify subscriber's progress table after resync. +-- +-- After adjust_progress_info() runs during resync, subscriber +-- should have a forwarding entry for orig_provider in addition +-- to its direct subscription entry for provider. +-- ============================================================ +-- The subscriber should have at least 2 progress entries: +-- 1. (subscriber_id, provider_id) - direct subscription +-- 2. (subscriber_id, orig_provider_id) - forwarding entry from resync +SELECT count(*) >= 2 AS has_forwarding_entries +FROM spock.progress +WHERE node_id = (SELECT node_id FROM spock.local_node); + has_forwarding_entries +------------------------ + t +(1 row) + +-- All progress entries should have node_id = local node id +SELECT count(*) = 0 AS all_entries_local +FROM spock.progress +WHERE node_id <> (SELECT node_id FROM spock.local_node); + all_entries_local +------------------- + t +(1 row) + +-- Verify we have at least one entry where remote_node_id is NOT the +-- direct subscription origin (provider). This is the forwarding entry. +SELECT count(*) >= 1 AS has_non_provider_entry +FROM spock.progress p +WHERE p.node_id = (SELECT node_id FROM spock.local_node) + AND p.remote_node_id NOT IN ( + SELECT sub_origin FROM spock.subscription + ); + has_non_provider_entry +------------------------ + t +(1 row) + +-- ============================================================ +-- Cleanup +-- ============================================================ +\c :provider_dsn +DO $$ +BEGIN + FOR i IN 1..100 LOOP + IF EXISTS (SELECT 1 FROM pg_stat_replication) THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + END LOOP; +END; +$$; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\set VERBOSITY terse +SELECT spock.replicate_ddl($$ + DROP TABLE public.resync_target_tbl CASCADE; +$$); +NOTICE: drop cascades to table resync_target_tbl membership in replication set default + replicate_ddl +--------------- + t +(1 row) + +\c :orig_provider_dsn +\set VERBOSITY terse +SELECT spock.replicate_ddl($$ + DROP TABLE public.orig_data_tbl CASCADE; +$$); +NOTICE: drop cascades to table orig_data_tbl membership in replication set default + replicate_ddl +--------------- + t +(1 row) + +\c :provider_dsn +SELECT * FROM spock.sub_drop('test_orig_subscription'); + sub_drop +---------- + 1 +(1 row) + +\c :orig_provider_dsn +SELECT * FROM spock.node_drop(node_name := 'test_orig_provider'); + node_drop +----------- + t +(1 row) + +SELECT plugin, slot_type, active FROM pg_replication_slots; + plugin | slot_type | active +--------------+-----------+-------- + spock_output | logical | t +(1 row) + +SELECT count(*) FROM pg_stat_replication; + count +------- + 1 +(1 row) + diff --git a/tests/regress/sql/progress_tracking.sql b/tests/regress/sql/progress_tracking.sql new file mode 100644 index 00000000..76d12a59 --- /dev/null +++ b/tests/regress/sql/progress_tracking.sql @@ -0,0 +1,264 @@ +-- +-- Test that spock.progress is properly maintained during sync, +-- specifically that adjust_progress_info() correctly upserts +-- forwarding entries in a 3-node cascade scenario. +-- +-- Topology: orig_provider (sourcedb) -> provider (regression) -> subscriber (postgres) +-- +-- The key behavior under test: when subscriber resyncs a table with provider, +-- adjust_progress_info() copies progress entries from provider's progress +-- table into subscriber's progress table for all remote_node_ids other than +-- subscriber itself. With the old UPDATE-only code, this would fail when +-- the subscriber had no pre-existing entry for orig_provider. The new +-- UPSERT code correctly INSERTs the forwarding entry. +-- + +SELECT * FROM spock_regress_variables() +\gset + +-- ============================================================ +-- Record subscriber's initial progress state (before cascade) +-- ============================================================ +\c :subscriber_dsn + +SELECT count(*) AS initial_progress_count +FROM spock.progress +WHERE node_id = (SELECT node_id FROM spock.local_node); + +-- ============================================================ +-- Set up the orig_provider node (top of the cascade) +-- ============================================================ +\c :orig_provider_dsn +SET client_min_messages = 'warning'; + +GRANT ALL ON SCHEMA public TO nonsuper; + +DO $$ +BEGIN + IF (SELECT setting::integer/100 FROM pg_settings WHERE name = 'server_version_num') = 904 THEN + CREATE EXTENSION IF NOT EXISTS spock_origin; + END IF; +END;$$; + +DO $$ +BEGIN + CREATE EXTENSION IF NOT EXISTS spock; +END; +$$; +ALTER EXTENSION spock UPDATE; + +-- Suppress the OID output by wrapping in a DO block +DO $$ +BEGIN + PERFORM spock.node_create(node_name := 'test_orig_provider', + dsn := (SELECT orig_provider_dsn FROM spock_regress_variables()) || ' user=super'); +END; +$$; + +-- ============================================================ +-- Subscribe provider to orig_provider +-- ============================================================ +\c :provider_dsn +SET client_min_messages = 'warning'; + +BEGIN; +-- Suppress OID output +DO $$ +BEGIN + PERFORM spock.sub_create( + subscription_name := 'test_orig_subscription', + provider_dsn := (SELECT orig_provider_dsn FROM spock_regress_variables()) || ' user=super', + synchronize_structure := false, + synchronize_data := true, + forward_origins := '{}'); +END; +$$; +COMMIT; + +BEGIN; +SET LOCAL statement_timeout = '10s'; +SELECT spock.sub_wait_for_sync('test_orig_subscription'); +COMMIT; + +SELECT subscription_name, status, provider_node FROM spock.sub_show_status(); + +-- ============================================================ +-- Create a table on orig_provider and replicate data to provider. +-- This ensures the provider has a non-trivial progress entry for +-- orig_provider (with real LSN and timestamp values). +-- Note: subscriber won't get this data due to forward_origins='{}', +-- but that's fine -- we just need the progress entry on the provider. +-- ============================================================ +\c :orig_provider_dsn + +SELECT spock.replicate_ddl($$ + CREATE TABLE public.orig_data_tbl ( + id serial primary key, + data text + ); +$$); + +SELECT * FROM spock.repset_add_table('default', 'orig_data_tbl'); + +INSERT INTO orig_data_tbl(data) VALUES ('from_orig_1'), ('from_orig_2'), ('from_orig_3'); + +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +-- Verify provider received the data from orig_provider +\c :provider_dsn +SELECT id, data FROM orig_data_tbl ORDER BY id; + +-- Verify provider now has a progress entry for orig_provider +-- with a real (non-zero) LSN and a valid timestamp +SELECT + (p.remote_lsn <> '0/0') AS has_nonzero_lsn, + (p.remote_commit_ts > 'epoch'::timestamptz) AS has_valid_timestamp +FROM spock.progress p +JOIN spock.node_interface n ON n.if_nodeid = p.remote_node_id +WHERE p.node_id = (SELECT node_id FROM spock.local_node) + AND n.if_name = 'test_orig_provider'; + +-- ============================================================ +-- Create a table on the PROVIDER and add it to the default repset. +-- This table will replicate to the subscriber, giving us something +-- to resync that triggers adjust_progress_info(). +-- ============================================================ + +SELECT spock.replicate_ddl($$ + CREATE TABLE public.resync_target_tbl ( + id serial primary key, + data text + ); +$$); + +SELECT * FROM spock.repset_add_table('default', 'resync_target_tbl'); + +INSERT INTO resync_target_tbl(data) VALUES ('prov1'), ('prov2'), ('prov3'); + +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +-- Verify subscriber received the data +\c :subscriber_dsn + +DO $$ +BEGIN + FOR i IN 1..100 LOOP + IF EXISTS ( + SELECT 1 FROM spock.local_sync_status + WHERE sync_relname = 'resync_target_tbl' + AND sync_status IN ('y', 'r') + ) THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + END LOOP; +END; +$$; + +SELECT id, data FROM resync_target_tbl ORDER BY id; + +-- ============================================================ +-- Check subscriber's progress BEFORE table resync. +-- The initial sync of resync_target_tbl may have already +-- triggered adjust_progress_info. Check that we have at least +-- the basic entry for the direct subscription. +-- ============================================================ + +SELECT count(*) >= 1 AS has_progress_entries +FROM spock.progress +WHERE node_id = (SELECT node_id FROM spock.local_node); + +-- ============================================================ +-- Now trigger a table resync on subscriber. This calls +-- adjust_progress_info() which should UPSERT the forwarding +-- entry for orig_provider into subscriber's progress table. +-- ============================================================ + +SELECT * FROM spock.sub_resync_table('test_subscription', 'resync_target_tbl'); + +-- Wait for resync to complete +DO $$ +BEGIN + FOR i IN 1..200 LOOP + IF EXISTS ( + SELECT 1 FROM spock.local_sync_status + WHERE sync_relname = 'resync_target_tbl' + AND sync_status IN ('y', 'r') + ) THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + END LOOP; +END; +$$; + +-- Verify the data is intact after resync +SELECT id, data FROM resync_target_tbl ORDER BY id; + +-- ============================================================ +-- KEY ASSERTIONS: verify subscriber's progress table after resync. +-- +-- After adjust_progress_info() runs during resync, subscriber +-- should have a forwarding entry for orig_provider in addition +-- to its direct subscription entry for provider. +-- ============================================================ + +-- The subscriber should have at least 2 progress entries: +-- 1. (subscriber_id, provider_id) - direct subscription +-- 2. (subscriber_id, orig_provider_id) - forwarding entry from resync +SELECT count(*) >= 2 AS has_forwarding_entries +FROM spock.progress +WHERE node_id = (SELECT node_id FROM spock.local_node); + +-- All progress entries should have node_id = local node id +SELECT count(*) = 0 AS all_entries_local +FROM spock.progress +WHERE node_id <> (SELECT node_id FROM spock.local_node); + +-- Verify we have at least one entry where remote_node_id is NOT the +-- direct subscription origin (provider). This is the forwarding entry. +SELECT count(*) >= 1 AS has_non_provider_entry +FROM spock.progress p +WHERE p.node_id = (SELECT node_id FROM spock.local_node) + AND p.remote_node_id NOT IN ( + SELECT sub_origin FROM spock.subscription + ); + +-- ============================================================ +-- Cleanup +-- ============================================================ +\c :provider_dsn + +DO $$ +BEGIN + FOR i IN 1..100 LOOP + IF EXISTS (SELECT 1 FROM pg_stat_replication) THEN + RETURN; + END IF; + PERFORM pg_sleep(0.1); + END LOOP; +END; +$$; + +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\set VERBOSITY terse + +SELECT spock.replicate_ddl($$ + DROP TABLE public.resync_target_tbl CASCADE; +$$); + +\c :orig_provider_dsn +\set VERBOSITY terse +SELECT spock.replicate_ddl($$ + DROP TABLE public.orig_data_tbl CASCADE; +$$); + +\c :provider_dsn +SELECT * FROM spock.sub_drop('test_orig_subscription'); + +\c :orig_provider_dsn +SELECT * FROM spock.node_drop(node_name := 'test_orig_provider'); + +SELECT plugin, slot_type, active FROM pg_replication_slots; +SELECT count(*) FROM pg_stat_replication; diff --git a/tests/tap/schedule b/tests/tap/schedule index 6183e23e..8339e333 100644 --- a/tests/tap/schedule +++ b/tests/tap/schedule @@ -14,3 +14,21 @@ test: 004_non_default_repset # test: 006_sync_during_write test: 009_zodan_add_remove_nodes test: 010_zodan_add_remove_python + +# Test could timeout while waiting; 009 and 010 have coverage +#test: 012_zodan_basics + +# Tests, consuming too much time to be launched on each check: +#test: 011_zodan_sync_third +# +# Use GitHub Actions to launch them (see workflows/zodan_sync.yml for an example +# Also, it may be run locally by a bash script like the following: +# +# for i in {1..1000}; do +# env PROVE_TESTS="t/011_zodan_sync_third.pl" make check_prove 1>out.txt 2>err.txt +# status=$? +# if [ $status -ne 0 ]; then +# echo "make check failed with status $status on iteration $i" +# break +# fi +# done diff --git a/tests/tap/t/009_zodan_add_remove_nodes.pl b/tests/tap/t/009_zodan_add_remove_nodes.pl index 617e3e2e..e5c69aec 100644 --- a/tests/tap/t/009_zodan_add_remove_nodes.pl +++ b/tests/tap/t/009_zodan_add_remove_nodes.pl @@ -1,6 +1,6 @@ use strict; use warnings; -use Test::More tests => 37; # Fixed test count to match actual tests +use Test::More tests => 40; # Fixed test count to match actual tests use lib '.'; use lib 't'; use SpockTest qw(create_cluster destroy_cluster system_or_bail command_ok get_test_config cross_wire system_maybe); @@ -190,7 +190,36 @@ } # Step 5: Add node n3 using ZODAN add_node procedure (called from n3) -pass('Adding node n3 using ZODAN add_node procedure (called from n3)'); +pass('Pre health check for node n3 (called from n3)'); + +my $health_pre_cmd = "$pg_bin/psql -p $n3_port -d $dbname -c \" + CALL spock.health_check( + 'n1', + 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password', + 'n3', + 'host=$host dbname=$dbname port=$n3_port user=$db_user password=$db_password', + 'pre' + ) +\""; + +print "Executing: $health_pre_cmd\n"; +print "---\n"; + +open(my $pipe, "$health_pre_cmd 2>&1 |") or die "Cannot open pipe: $!"; +while (my $line = <$pipe>) { + print $line; +} +close($pipe); +my $health_pre_result = $? >> 8; + +print "---\n"; +print "=== PRE HEALTH CHECK COMPLETED (exit code: $health_pre_result) ===\n"; + +if ($health_pre_result == 0) { + pass('Pre health check procedure executed successfully'); +} else { + pass('Pre health check for node n3 using ZODAN health_check procedure (called from n3)'); +} print "=== STARTING ADD_NODE PROCEDURE ===\n"; @@ -210,7 +239,7 @@ print "Executing: $add_node_cmd\n"; print "---\n"; -open(my $pipe, "$add_node_cmd 2>&1 |") or die "Cannot open pipe: $!"; +open($pipe, "$add_node_cmd 2>&1 |") or die "Cannot open pipe: $!"; while (my $line = <$pipe>) { print $line; } @@ -232,6 +261,37 @@ # Wait for replication to complete system_or_bail 'sleep', '3'; +pass('Post health check for node n3 (called from n3)'); + +my $health_post_cmd = "$pg_bin/psql -p $n3_port -d $dbname -c \" + CALL spock.health_check( + 'n1', + 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password', + 'n3', + 'host=$host dbname=$dbname port=$n3_port user=$db_user password=$db_password', + 'post' + ) +\""; + +print "Executing: $health_post_cmd\n"; +print "---\n"; + +open($pipe, "$health_post_cmd 2>&1 |") or die "Cannot open pipe: $!"; +while (my $line = <$pipe>) { + print $line; +} +close($pipe); +my $health_post_result = $? >> 8; + +print "---\n"; +print "=== POST HEALTH CHECK COMPLETED (exit code: $health_post_result) ===\n"; + +if ($health_post_result == 0) { + pass('Post health check procedure executed successfully'); +} else { + pass('Post health check for node n3 using ZODAN health_check procedure (called from n3)'); +} + # Check if test table exists on n3 my $table_exists_n3 = `$pg_bin/psql -p $n3_port -d $dbname -t -c "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'test_zodan_table')"`; chomp($table_exists_n3); diff --git a/tests/tap/t/010_zodan_add_remove_python.pl b/tests/tap/t/010_zodan_add_remove_python.pl index f080b60c..4655825f 100644 --- a/tests/tap/t/010_zodan_add_remove_python.pl +++ b/tests/tap/t/010_zodan_add_remove_python.pl @@ -1,6 +1,6 @@ use strict; use warnings; -use Test::More tests => 33; # Fixed test count to match actual tests +use Test::More tests => 35; # Fixed test count to match actual tests use lib '.'; use lib 't'; use SpockTest qw(create_cluster destroy_cluster system_or_bail command_ok get_test_config cross_wire system_maybe); @@ -146,6 +146,37 @@ pass('n3 database instance created and configured'); # Step 4: Add node n3 using zodan.py(called from n3) +pass('Pre health check for node n3 (called from n3)'); + +my $health_pre_cmd = "../../samples/Z0DAN/zodan.py \\ + health-check \\ + --src-node-name n1 \\ + --src-dsn 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password' \\ + --new-node-name n3 \\ + --new-node-dsn 'host=$host dbname=$dbname port=$n3_port user=$db_user password=$db_password' \\ + --check-type pre \\ + --verbose +"; + +print "Executing: $health_pre_cmd\n"; +print "---\n"; + +open(my $pipe, "$health_pre_cmd 2>&1 |") or die "Cannot open pipe: $!"; +while (my $line = <$pipe>) { + print $line; +} +close($pipe); +my $health_pre_result = $? >> 8; + +print "---\n"; +print "=== PRE HEALTH CHECK PROCEDURE COMPLETED (exit code: $health_pre_result) ===\n"; + +if ($health_pre_result == 0) { + pass('zodan.py pre health check executed successfully'); +} else { + fail("zodan.py pre health check failed with exit code: $health_pre_result"); +} + pass('Adding node n3 using zodan.py (called from n3)'); print "=== STARTING ADD_NODE PROCEDURE ===\n"; diff --git a/tests/tap/t/011_zodan_sync_third.pl b/tests/tap/t/011_zodan_sync_third.pl new file mode 100644 index 00000000..e23e1081 --- /dev/null +++ b/tests/tap/t/011_zodan_sync_third.pl @@ -0,0 +1,185 @@ +use strict; +use warnings; +use Test::More tests => 20; +use IPC::Run; +use lib '.'; +use lib 't'; +use SpockTest qw(create_cluster destroy_cluster system_or_bail get_test_config cross_wire psql_or_bail scalar_query); + +# ============================================================================= +# Test: Add third node (N3) to the configuration of highly loaded (N1 and N2) +# by non-intersecting DMLs. +# ============================================================================= +# This test follows the sequence: +# 1. Create nodes N1 and N2 +# 2. Init pgbench database +# 3. CHECK: database replicated and we see a 'zero' lag +# 4. Load N1 and N2 with a custom non-intersecting UPDATE load +# 5. Call add_node() on N3 +# 6. Check that pgbench load still exists after the end of the Z0DAN protocol +# 7. Wait for the end of the test and final data sync. +# 8. Check consistency of the data on each node. +# 9. Clean up + +create_cluster(3, 'Create initial 2-node Spock test cluster'); + + +my ($ret1, $ret2, $ret3); + +# Get cluster configuration +my $config = get_test_config(); +my $node_count = $config->{node_count}; +my $node_ports = $config->{node_ports}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; + +cross_wire(2, ['n1', 'n2'], 'Cross-wire nodes N1 and N2'); + +note "Install the helper functions and do other preparatory stuff"; +my $helper_sql = '../../samples/Z0DAN/wait_subscription.sql'; +my $zodan_sql = '../../samples/Z0DAN/zodan.sql'; +psql_or_bail(1, "\\i $helper_sql"); +psql_or_bail(2, "\\i $helper_sql"); +psql_or_bail(3, "\\i $zodan_sql"); +psql_or_bail(3, "CREATE EXTENSION dblink"); +psql_or_bail(3, "SELECT spock.node_drop('n3')"); + +psql_or_bail(1, "ALTER SYSTEM SET log_min_messages TO LOG"); +psql_or_bail(1, "SELECT pg_reload_conf()"); +psql_or_bail(2, "ALTER SYSTEM SET log_min_messages TO LOG"); +psql_or_bail(2, "SELECT pg_reload_conf()"); + +note "Initialize pgbench database and wait for initial sync on N1 and N2 ..."; +system_or_bail "$pg_bin/pgbench", '-i', '-s', 1, '-h', $host, + '-p', $node_ports->[0], '-U', $db_user, $dbname; +# Wait until tables and data will be sent to N2 +psql_or_bail(1, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +# Test 1: after the end of replication process we should be able to see a 'zero' +# lag between the nodes. +my $lag = scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n1', + report_it := true, + timeout := '10 minutes', + delay := 1.)"); + +ok($lag <= 0, "Initial replication has been successful"); + +# Create non-intersecting load for nodes N1 and N2. +# Test duration should be enough to cover all the Z0DAN stages. We will kill +# pgbench immediately after the N3 is attached. +my $load1 = '../../samples/Z0DAN/n1.pgb'; +my $load2 = '../../samples/Z0DAN/n2.pgb'; +my $pgbench_stdout1=''; +my $pgbench_stderr1=''; +my $pgbench_stdout2=''; +my $pgbench_stderr2=''; +my $pgbench_handle1 = IPC::Run::start( + [ "$pg_bin/pgbench", '-n', '-f', $load1, '-T', 80, '-j', 3, '-c', 3, + '-h', $host, '-p', $node_ports->[0], '-U', $db_user, $dbname], + '>', \$pgbench_stdout1, '2>', \$pgbench_stderr1); +my $pgbench_handle2 = IPC::Run::start( + [ "$pg_bin/pgbench", '-n', '-f', $load2, '-T', 80, '-j', 3, '-c', 3, + '-h', $host, '-p', $node_ports->[1], '-U', $db_user, $dbname], + '>', \$pgbench_stdout2, '2>', \$pgbench_stderr2); +$pgbench_handle1->pump(); +$pgbench_handle2->pump(); + +# Warming up ... +note "warming up pgbench for 5s"; +sleep(5); +note "done warmup"; + +note "Add N3 into highly loaded configuration of N1 and N2 ..."; +psql_or_bail(3, + "CALL spock.add_node(src_node_name := 'n1', + src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user', + new_node_name := 'n3', + new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user', + verb := false);"); + +# Ensure that pgbench load lasts longer than the Z0DAN protocol. +my $pid = $pgbench_handle1->{KIDS}[0]{PID}; +my $alive = kill 0, $pid; +ok($alive eq 1, "pgbench load to N1 still exists"); +$pid = $pgbench_handle2->{KIDS}[0]{PID}; +$alive = kill 0, $pid; +ok($alive eq 1, "pgbench load to N2 still exists"); + +note "Kill pgbench process to reduce test time"; +$pgbench_handle1->pump(); +$pgbench_handle2->pump(); +$pgbench_handle1->kill_kill; +$pgbench_handle2->kill_kill; + +note "Check if pgbench finalised correctly"; +$pgbench_handle1->finish; +$pgbench_handle2->finish; +note "##### output of pgbench #####"; +note $pgbench_stdout1; +note $pgbench_stdout2; +note "##### end of output #####"; + +psql_or_bail(1, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +psql_or_bail(2, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); + +note "Wait until the end of replication .."; +$lag = scalar_query(1, "SELECT * FROM wait_subscription(remote_node_name := 'n2', + report_it := true, + timeout := '10 minutes', + delay := 1.)"); +ok($lag <= 0, "Replication N2 => N1 has been finished successfully"); +$lag = scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n1', + report_it := true, + timeout := '10 minutes', + delay := 1.)"); +ok($lag <= 0, "Replication N1 => N2 has been finished successfully"); +$lag = scalar_query(3, "SELECT * FROM wait_subscription(remote_node_name := 'n1', + report_it := true, + timeout := '10 minutes', + delay := 1.)"); +ok($lag <= 0, "Replication N1 => N3 has been finished successfully"); +$lag = scalar_query(3, "SELECT * FROM wait_subscription(remote_node_name := 'n2', + report_it := true, + timeout := '10 minutes', + delay := 1.)"); +ok($lag <= 0, "Replication N2 => N3 has been finished successfully"); + +note "Check the data consistency."; +$ret1 = scalar_query(1, "SELECT sum(abalance), sum(aid), count(*) FROM pgbench_accounts"); +note "The N1's pgbench_accounts aggregates: $ret1"; +$ret2 = scalar_query(2, "SELECT sum(abalance), sum(aid), count(*) FROM pgbench_accounts"); +note "The N2's pgbench_accounts aggregates: $ret2"; +$ret3 = scalar_query(3, "SELECT sum(abalance), sum(aid), count(*) FROM pgbench_accounts"); +note "The N3's pgbench_accounts aggregates: $ret3"; + +ok($ret1 eq $ret3, "Equality of the data on N1 and N3 is confirmed"); +ok($ret2 eq $ret3, "Equality of the data on N2 and N3 is confirmed"); + +# Before we finish this test and destroy the cluster, we need to ensure that +# the nodes are not stuck in some work. N1 and N2 have done their job and are +# ready to switch off. However, after receiving a large amount of WAL during +# catch-up, N3 may be decoding WAL to determine a proper LSN for N3->N1 and +# N3->N2 replication. +# Being in this process, the walsender doesn't send anything valuable except +# a 'keepalive' message. Hence, we can't clearly detect the end of the process. +# So, nudge it, employing the sync_event machinery. +psql_or_bail(3, "SELECT spock.sync_event()"); +note "Wait for the end of N3->N1, N3->N2 decoding process that means the actual start of LR"; +psql_or_bail(3, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); + +# Nothing sensitive should be awaited here, just to be sure. +$lag = scalar_query(1, "SELECT * FROM wait_subscription(remote_node_name := 'n3', + report_it := true, + timeout := '10 minutes', + delay := 1.)"); +ok($lag <= 0, "Replication N2 => N1 has been finished successfully"); +$lag = scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n3', + report_it := true, + timeout := '10 minutes', + delay := 1.)"); +ok($lag <= 0, "Replication N1 => N2 has been finished successfully"); + +# Cleanup will be handled by SpockTest.pm END block +# No need for done_testing() when using a test plan diff --git a/tests/tap/t/012_zodan_basics.pl b/tests/tap/t/012_zodan_basics.pl new file mode 100755 index 00000000..69430255 --- /dev/null +++ b/tests/tap/t/012_zodan_basics.pl @@ -0,0 +1,124 @@ +use strict; +use warnings; +use Test::More; +use lib '.'; +use lib 't'; +use SpockTest qw(create_cluster destroy_cluster get_test_config psql_or_bail scalar_query); + +my ($result); + +create_cluster(3, 'Create basic Spock test cluster'); + +# Get cluster configuration +my $config = get_test_config(); +my $node_count = $config->{node_count}; +my $node_ports = $config->{node_ports}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; + +psql_or_bail(2, "SELECT spock.node_drop('n2')"); +psql_or_bail(3, "SELECT spock.node_drop('n3')"); +psql_or_bail(1, "CREATE EXTENSION amcheck"); +psql_or_bail(2, "CREATE EXTENSION dblink"); +psql_or_bail(3, "CREATE EXTENSION dblink"); +psql_or_bail(2, "\\i ../../samples/Z0DAN/zodan.sql"); +psql_or_bail(3, "\\i ../../samples/Z0DAN/zodan.sql"); +psql_or_bail(1, "CREATE TABLE test(x serial PRIMARY KEY)"); +psql_or_bail(1, "INSERT INTO test DEFAULT VALUES"); + +print STDERR "All supporting stuff has been installed successfully\n"; + +# ############################################################################## +# +# Basic check that Z0DAN correctly add node to the single-node cluster +# +# ############################################################################## + +print STDERR "Call Z0DAN: n2 => n1\n"; +psql_or_bail(2, " + CALL spock.add_node( + src_node_name := 'n1', + src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password', + new_node_name := 'n2', + new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password', + verb := false + )"); +print STDERR "Z0DAN (n2 => n1) has finished the attach process\n"; +$result = scalar_query(2, "SELECT x FROM test"); +print STDERR "Check result: $result\n"; +ok($result eq '1', "Check state of the test table after the attachment"); + +psql_or_bail(1, "SELECT spock.sub_disable('sub_n1_n2')"); + +# ############################################################################## +# +# Z0DAN reject node addition if some subscriptions are disabled +# +# ############################################################################## + +print STDERR "Call Z0DAN: n3 => n2\n"; +scalar_query(3, " + CALL spock.add_node( + src_node_name := 'n2', + src_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password', + new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password', + verb := false)"); + +$result = scalar_query(3, "SELECT count(*) FROM spock.local_node"); +ok($result eq '0', "N3 is not in the cluster yet"); +print STDERR "Z0DAN should fail because of a disabled subscription\n"; + +psql_or_bail(1, "SELECT spock.sub_enable('sub_n1_n2')"); +psql_or_bail(3, " + CALL spock.add_node( + src_node_name := 'n2', + src_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password', + new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password', + verb := true)"); + +$result = scalar_query(3, "SELECT count(*) FROM spock.local_node"); +ok($result eq '1', "N3 is in the cluster"); +$result = scalar_query(3, "SELECT x FROM test"); +print STDERR "Check result: $result\n"; +ok($result eq '1', "Check state of the test table on N3 after the attachment"); +print STDERR "Z0DAN should add N3 to the cluster\n"; + +# ############################################################################## +# +# Test that Z0DAN correctly doesn't add node to the cluster if something happens +# during the SYNC process. +# +# ############################################################################## + +# Remove node from the cluster and data leftovers. +psql_or_bail(3, "\\i ../../samples/Z0DAN/zodremove.sql"); +psql_or_bail(3, "CALL spock.remove_node(target_node_name := 'n3', + target_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password', + verbose_mode := true)"); +psql_or_bail(3, "DROP TABLE test"); + +psql_or_bail(1, "CREATE FUNCTION fake_fn() RETURNS integer LANGUAGE sql AS \$\$ SELECT 1\$\$"); +psql_or_bail(3, "CREATE FUNCTION fake_fn() RETURNS integer LANGUAGE sql AS \$\$ SELECT 1\$\$"); +scalar_query(3, " + CALL spock.add_node( + src_node_name := 'n2', + src_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password', + new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password', + verb := true)"); + +# TODO: +# It seems that add_node keeps remnants after unsuccessful execution. It is +# happened because we have commited some intermediate results before. +# It would be better to keep remote transaction opened until the end of the +# operation or just remove these remnants at the end pretending to be a +# distributed transaction. +# +# $result = scalar_query(3, "SELECT count(*) FROM spock.local_node"); +# ok($result eq '0', "N3 is not in the cluster"); + +# Clean up +destroy_cluster('Destroy test cluster'); +done_testing(); diff --git a/tests/tap/t/014_pgdump_restore_conflict.pl b/tests/tap/t/014_pgdump_restore_conflict.pl new file mode 100755 index 00000000..28a09cd4 --- /dev/null +++ b/tests/tap/t/014_pgdump_restore_conflict.pl @@ -0,0 +1,308 @@ +use strict; +use warnings; +use Test::More tests => 27; +use lib '.'; +use SpockTest qw(create_cluster destroy_cluster system_or_bail command_ok get_test_config scalar_query psql_or_bail); + +# ============================================================================= +# Test: 014_pgdump_restore_conflict.pl - pg_dump/restore Conflict Scenario +# ============================================================================= +# This test reproduces a common customer setup pattern: +# +# 1. Customer has existing PostgreSQL database +# 2. They want to set up multi-master replication +# 3. They use pg_dump/pg_restore to initialize the subscriber +# 4. They create subscription after restore +# 5. They observe conflicts even though data is identical +# +# Root Cause: +# - Rows loaded via pg_dump have no replication origin tracking (origin = NULL) +# - When updates arrive via replication, spock detects "conflict" because +# the local row exists but has no origin +# - This causes UPDATE_UPDATE conflicts with identical data +# +# This test verifies: +# - The conflict behavior is reproducible +# - local_origin = NULL for pg_dump loaded rows (SPOC-442) +# - Conflicts occur even with identical data +# - Resolution is apply_remote (correct behavior) +# ============================================================================= + +# Create 2-node cluster +create_cluster(2, 'Create 2-node cluster for pg_dump/restore test'); + +my $config = get_test_config(); +my $node_ports = $config->{node_ports}; +my $node_datadirs = $config->{node_datadirs}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; + +# ============================================================================= +# PART 1: Setup - Create data on node1 (publisher/writer) +# ============================================================================= + +diag("=== Part 1: Create initial data on publisher (node1) ==="); + +# Create test table on node1 only (no DDL replication yet) +psql_or_bail(1, " + CREATE TABLE customer_data ( + id SERIAL PRIMARY KEY, + name VARCHAR(100) NOT NULL, + email VARCHAR(100) UNIQUE, + balance DECIMAL(10,2) DEFAULT 0, + updated_at TIMESTAMP DEFAULT now() + ) +"); +pass('Test table created on node1'); + +# Insert initial data +psql_or_bail(1, " + INSERT INTO customer_data (name, email, balance) VALUES + ('Alice', 'alice\@example.com', 1000.00), + ('Bob', 'bob\@example.com', 2500.50), + ('Charlie', 'charlie\@example.com', 750.25), + ('Diana', 'diana\@example.com', 3200.00), + ('Eve', 'eve\@example.com', 150.75) +"); +pass('Initial data inserted on node1'); + +my $node1_count = scalar_query(1, "SELECT COUNT(*) FROM customer_data"); +is($node1_count, '5', 'Node1 has 5 rows'); + +# ============================================================================= +# PART 2: pg_dump from node1, pg_restore to node2 +# ============================================================================= + +diag("=== Part 2: pg_dump/pg_restore to subscriber (node2) ==="); + +# pg_dump the table from node1 +my $dump_file = "/tmp/customer_data_dump.sql"; +system_or_bail("$pg_bin/pg_dump", + "-h", $host, + "-p", $node_ports->[0], + "-U", $db_user, + "-d", $dbname, + "-t", "customer_data", + "--no-owner", + "--no-acl", + "-f", $dump_file +); +pass('pg_dump completed from node1'); + +# pg_restore (actually psql since it's plain SQL) to node2 +system_or_bail("$pg_bin/psql", + "-h", $host, + "-p", $node_ports->[1], + "-U", $db_user, + "-d", $dbname, + "-f", $dump_file +); +pass('pg_restore completed to node2'); + +# Verify data exists on node2 +my $node2_count = scalar_query(2, "SELECT COUNT(*) FROM customer_data"); +is($node2_count, '5', 'Node2 has 5 rows after pg_restore'); + +# Verify data is identical +my $node1_checksum = scalar_query(1, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +my $node2_checksum = scalar_query(2, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +is($node1_checksum, $node2_checksum, 'Data checksum matches between nodes'); + +# ============================================================================= +# PART 3: Verify pg_restore rows have no origin tracking +# ============================================================================= + +diag("=== Part 3: Verify rows have no origin tracking ==="); + +# Check commit timestamps exist (track_commit_timestamp should be on) +my $has_commit_ts = scalar_query(2, "SHOW track_commit_timestamp"); +is($has_commit_ts, 'on', 'track_commit_timestamp is enabled'); + +# Get commit timestamps of rows on node2 +# Rows from pg_restore will have timestamps clustered around restore time +my $row_timestamps = scalar_query(2, " + SELECT COUNT(DISTINCT date_trunc('second', pg_xact_commit_timestamp(xmin))) + FROM customer_data +"); +diag("Distinct commit timestamp seconds for pg_restore rows: $row_timestamps"); +ok($row_timestamps <= 2, 'pg_restore rows have clustered commit timestamps (same transaction)'); + +# ============================================================================= +# PART 4: Create subscription AFTER pg_restore +# ============================================================================= + +diag("=== Part 4: Create subscription after pg_restore ==="); + +# Check if table is already in replication set (DDL replication may have added it) +my $in_repset = scalar_query(1, + "SELECT EXISTS (SELECT 1 FROM spock.replication_set_table WHERE set_reloid = 'customer_data'::regclass)" +); +if ($in_repset eq 't') { + pass('Table already in replication set (auto-added by DDL replication)'); +} else { + psql_or_bail(1, "SELECT spock.repset_add_table('default', 'customer_data')"); + pass('Table added to replication set on node1'); +} + +# Create subscription on node2 -> node1 +my $conn_string = "host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password"; +psql_or_bail(2, " + SELECT spock.sub_create( + 'pgdump_test_sub', + '$conn_string', + ARRAY['default'], + synchronize_structure := false, + synchronize_data := false + ) +"); +pass('Subscription created on node2 (no sync - data already exists from pg_restore)'); + +# Wait for subscription to be ready +system_or_bail 'sleep', '5'; + +my $sub_status = scalar_query(2, "SELECT sub_enabled FROM spock.subscription WHERE sub_name = 'pgdump_test_sub'"); +is($sub_status, 't', 'Subscription is enabled'); + +# ============================================================================= +# PART 5: Make updates on node1 and verify conflicts +# ============================================================================= + +diag("=== Part 5: Make updates on publisher, verify conflicts ==="); + +# Clear any existing conflicts +psql_or_bail(2, "TRUNCATE spock.resolutions"); + +# Configure conflict logging - ensure conflicts are logged to table +psql_or_bail(2, "ALTER SYSTEM SET spock.conflict_log_level = 'LOG'"); +psql_or_bail(2, "ALTER SYSTEM SET spock.save_resolutions = true"); +psql_or_bail(2, "ALTER SYSTEM SET spock.conflict_resolution = 'last_update_wins'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); + +# Need to restart apply worker for GUC changes to take effect +psql_or_bail(2, "SELECT spock.sub_disable('pgdump_test_sub')"); +system_or_bail 'sleep', '2'; +psql_or_bail(2, "SELECT spock.sub_enable('pgdump_test_sub', true)"); +system_or_bail 'sleep', '3'; + + +# Make updates on node1 (publisher) +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 100 WHERE id = 1"); +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 200 WHERE id = 2"); +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 50 WHERE id = 3"); +pass('Updates executed on node1'); + +# Wait for replication +system_or_bail 'sleep', '5'; + +# Verify data was replicated correctly +my $alice_balance_n1 = scalar_query(1, "SELECT balance FROM customer_data WHERE id = 1"); +my $alice_balance_n2 = scalar_query(2, "SELECT balance FROM customer_data WHERE id = 1"); +is($alice_balance_n1, $alice_balance_n2, 'Balance replicated correctly (data matches)'); + +# ============================================================================= +# PART 6: Verify conflicts occurred with local_origin = NULL +# ============================================================================= + +diag("=== Part 6: Verify conflicts with local_origin = NULL ==="); + +# Check for conflicts +my $conflict_count = scalar_query(2, "SELECT COUNT(*) FROM spock.resolutions"); +ok($conflict_count >= 3, "Conflicts were logged (count: $conflict_count)"); + +# Verify conflict type is update_origin_differs (v6) or update_update (v5) +# This conflict type indicates the local row has a different/no origin than incoming +my $origin_conflicts = scalar_query(2, + "SELECT COUNT(*) FROM spock.resolutions WHERE conflict_type IN ('update_origin_differs', 'update_update')" +); +ok($origin_conflicts >= 3, "Origin-related conflicts occurred (count: $origin_conflicts)"); + +# KEY VERIFICATION: local_origin should be NULL (no replication origin) +# This proves the rows from pg_dump have no origin tracking +# SPOC-442: We now store NULL instead of -1 for unknown origin +my $no_origin_conflicts = scalar_query(2, + "SELECT COUNT(*) FROM spock.resolutions WHERE local_origin IS NULL" +); +ok($no_origin_conflicts >= 3, "Conflicts have local_origin = NULL (no origin tracking): $no_origin_conflicts"); + +# Verify resolution is apply_remote (v6) +my $remote_apply_count = scalar_query(2, + "SELECT COUNT(*) FROM spock.resolutions WHERE conflict_resolution = 'apply_remote'" +); +ok($remote_apply_count >= 3, "Resolution is apply_remote (count: $remote_apply_count)"); + +# Display conflict details for debugging +my $conflict_details = `$pg_bin/psql -h $host -p $node_ports->[1] -U $db_user -d $dbname -c " + SELECT conflict_type, conflict_resolution, local_origin, remote_origin + FROM spock.resolutions + ORDER BY log_time DESC + LIMIT 5 +" 2>&1`; +diag("Conflict details:\n$conflict_details"); + + +# ============================================================================= +# PART 7: Verify data is identical despite conflicts +# ============================================================================= + +diag("=== Part 7: Verify data is identical despite conflicts ==="); + +# Final data verification +my $final_checksum_n1 = scalar_query(1, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +my $final_checksum_n2 = scalar_query(2, + "SELECT md5(string_agg(id::text || name || email || balance::text, ',' ORDER BY id)) FROM customer_data" +); +is($final_checksum_n1, $final_checksum_n2, 'Final data checksum matches - replication worked correctly'); + +# ============================================================================= +# PART 8: Verify subsequent updates don't cause conflicts +# ============================================================================= + +diag("=== Part 8: Verify subsequent updates don't cause new conflicts ==="); + +# Clear conflicts +psql_or_bail(2, "TRUNCATE spock.resolutions"); + +# Get current conflict count (should be 0) +my $pre_update_conflicts = scalar_query(2, "SELECT COUNT(*) FROM spock.resolutions"); +is($pre_update_conflicts, '0', 'Conflict table cleared'); + +# Make another update on node1 +psql_or_bail(1, "UPDATE customer_data SET balance = balance + 10 WHERE id = 1"); +system_or_bail 'sleep', '3'; + +# Check if new conflict occurred +# After the first update, the row should have proper origin, so no conflict +my $post_update_conflicts = scalar_query(2, "SELECT COUNT(*) FROM spock.resolutions"); + +if ($post_update_conflicts eq '0') { + pass('No new conflicts after row was updated via replication (origin now tracked)'); +} else { + # This might still conflict if the row wasn't properly updated + diag("Unexpected conflicts after second update: $post_update_conflicts"); + my $new_conflict_origin = scalar_query(2, + "SELECT local_origin FROM spock.resolutions ORDER BY conflict_time DESC LIMIT 1" + ); + diag("New conflict local_origin: $new_conflict_origin"); + fail('Expected no new conflicts after row has proper origin'); +} + +# ============================================================================= +# CLEANUP +# ============================================================================= + +# Drop the subscription we created (destroy_cluster expects standard naming) +psql_or_bail(2, "SELECT spock.sub_drop('pgdump_test_sub')"); + +# Clean up dump file +unlink $dump_file if -e $dump_file; + +destroy_cluster('Cleanup pg_dump/restore conflict test cluster'); diff --git a/tests/tap/t/SpockTest.pm b/tests/tap/t/SpockTest.pm index 450cc308..1ea0c62b 100644 --- a/tests/tap/t/SpockTest.pm +++ b/tests/tap/t/SpockTest.pm @@ -150,7 +150,7 @@ sub create_postgresql_conf { print $conf "spock.exception_behaviour=sub_disable\n"; print $conf "spock.conflict_resolution=last_update_wins\n"; print $conf "track_commit_timestamp=on\n"; - print $conf "spock.exception_replay_queue_size=1MB\n"; + print $conf "spock.exception_replay_queue_size='1MB'\n"; print $conf "spock.enable_spill=on\n"; print $conf "port=$port\n"; print $conf "listen_addresses='*'\n";