From eda3f0df9d6cb896573936795f4a89ebe49d7a51 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Thu, 7 May 2026 17:36:06 -0700 Subject: [PATCH] apply: handle upstream connection loss without origin-advance leak When the upstream TCP connection dies (firewall reload, walsender RST, keepalive timeout) the apply worker had two interacting problems: 1. fd = PQsocket(applyconn) was captured once before stream_replay: and reused by every WaitLatchOrSocket() call. After libpq closed the socket the cached fd was stale (possibly reused by the OS), producing epoll_ctl(EINVAL) on Linux when WaitLatchOrSocket re-armed it. The follow-on exception was caught with use_try_block=true and PG_RE_THROW'd, producing a noisy two-error cascade per disconnect. 2. PG_CATCH treated every error the same way -- AbortOutOfAnyTransaction then goto stream_replay. For a connection-class error this is meaningless: the upstream is gone, replay cannot succeed. Worse, replorigin_session_origin_lsn was still set (by handle_begin) to the in-flight remote transaction's final_lsn, and RecordTransactionAbort advances the replication origin to that stale LSN unless the session origin variable is cleared first. spock_apply_main's on-exit flush_progress_if_needed(true) commit has the same gate. Either path can silently advance origin past an aborted remote transaction so it gets skipped on reconnect -- the loss mode PG core defends against in start_apply (worker.c) with replorigin_reset. Three changes: * Tag the disconnect/timeout/EOF elog(ERROR) sites with ERRCODE_CONNECTION_FAILURE via ereport so the discriminator below can classify them deterministically. * Refresh fd = PQsocket(applyconn) at the top of every wait-loop iteration, after the previous iteration's PQconsumeInput has had a chance to update libpq state. If the connection has gone bad, raise a tagged ERROR rather than passing a closed fd to WaitLatchOrSocket. Mirrors the libpqrcv_receive pattern in PG core. * In apply_work's PG_CATCH, detect connection-class errors (sqlerrcode CONNECTION_FAILURE / CONNECTION_EXCEPTION / CONNECTION_DOES_NOT_EXIST / ADMIN_SHUTDOWN, or PQstatus == CONNECTION_BAD as a libpq fallback) and PG_RE_THROW. apply_work is the only PG_TRY in this call stack, so the rethrow propagates to the bgworker error handler. Origin is zeroed by spock_apply_worker_shmem_exit (before_shmem_exit) before ShutdownPostgres calls AbortOutOfAnyTransaction, so neither the abort nor the bypassed on-exit flush_progress can advance origin to the stale LSN. Apply-side errors (constraint violations etc.) do NOT take this branch -- they continue through the existing exception_log replay path. Documents the before_shmem_exit LIFO ordering dependency in spock_apply_worker_shmem_exit so the invariant survives future edits. Adds two TAP tests (not in the schedule, run manually): - 101_clean_exit_on_conn_loss.pl asserts the new "exiting via rethrow" LOG line is emitted. Fails without the fix. - 102_no_origin_leak_on_disconnect.pl exercises the apply_delay / SIGKILL path and asserts no assertion failure, no SIGABRT, and no row loss across the disconnect. (cherry picked from commit b75cb4fc4646a31e3db63a624af1354fc58c13b2) --- src/spock_apply.c | 127 ++++++++++-- tests/tap/t/101_clean_exit_on_conn_loss.pl | 190 +++++++++++++++++ .../tap/t/102_no_origin_leak_on_disconnect.pl | 195 ++++++++++++++++++ 3 files changed, 491 insertions(+), 21 deletions(-) create mode 100644 tests/tap/t/101_clean_exit_on_conn_loss.pl create mode 100644 tests/tap/t/102_no_origin_leak_on_disconnect.pl diff --git a/src/spock_apply.c b/src/spock_apply.c index 6d35b242..ccfa4858 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -1922,10 +1922,20 @@ static void spock_apply_worker_shmem_exit(int code, Datum arg) { /* - * Reset replication session to avoid reuse after an error. This is done - * in a before_shmem_exit callback instead of on_proc_exit because the - * backend may also clean up the origin in certain cases, and we want to - * avoid duplicate cleanup. + * Reset replication session to avoid reuse after an error. + * This is done in a before_shmem_exit callback instead of + * on_proc_exit because the backend may also clean up the origin + * in certain cases, and we want to avoid duplicate cleanup. + * + * Ordering matters: this must run before ShutdownPostgres() + * (also a before_shmem_exit, registered earlier during + * InitPostgres) so the origin is Invalid by the time + * ShutdownPostgres calls AbortOutOfAnyTransaction(). Otherwise + * RecordTransactionAbort advances the origin to the in-flight + * transaction's stale final_lsn, silently skipping that txn on + * reconnect. LIFO callback order makes this work. The + * connection-error rethrow path in apply_work's PG_CATCH + * relies on it. */ replorigin_session_origin = InvalidRepOriginId; replorigin_session_origin_lsn = InvalidXLogRecPtr; @@ -2969,11 +2979,31 @@ apply_work(PGconn *streamConn) MySpockWorker->worker_status = SPOCK_WORKER_STATUS_RUNNING; /* - * Background workers mustn't call usleep() or any direct - * equivalent instead, they may wait on their process latch, which - * sleeps as necessary, but is awakened if postmaster dies. That - * way the background process goes away immediately in an - * emergency. + * Refresh fd at the top of every iteration. On the previous + * iteration, PQconsumeInput may have detected a dead socket and + * caused libpq to close it (and possibly reset conn->sock to + * PGINVALID_SOCKET). Reading PQsocket() again here ensures we + * pass libpq's current value to WaitLatchOrSocket -- never a + * stale fd, which would otherwise cause epoll_ctl(EINVAL) on + * Linux. If the connection has gone bad, raise a tagged error + * so the PG_CATCH discriminator routes us to a clean exit. + */ + fd = PQsocket(applyconn); + if (PQstatus(applyconn) == CONNECTION_BAD || + fd == PGINVALID_SOCKET) + { + MySpockWorker->worker_status = SPOCK_WORKER_STATUS_STOPPED; + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("SPOCK %s: connection to other side has died", + MySubscription->name))); + } + + /* + * Background workers mustn't call usleep() or any direct equivalent + * instead, they may wait on their process latch, which sleeps as + * necessary, but is awakened if postmaster dies. That way the + * background process goes away immediately in an emergency. */ rc = WaitLatchOrSocket(&MyProc->procLatch, WL_SOCKET_READABLE | WL_LATCH_SET | @@ -3004,8 +3034,10 @@ apply_work(PGconn *streamConn) if (PQstatus(applyconn) == CONNECTION_BAD) { MySpockWorker->worker_status = SPOCK_WORKER_STATUS_STOPPED; - elog(ERROR, "SPOCK %s: connection to other side has died", - MySubscription->name); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("SPOCK %s: connection to other side has died", + MySubscription->name))); } /* @@ -3112,23 +3144,29 @@ apply_work(PGconn *streamConn) { if (buf != NULL) PQfreemem(buf); - elog(ERROR, "SPOCK %s: data stream ended", - MySubscription->name); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("SPOCK %s: data stream ended", + MySubscription->name))); } else if (r == -2) { if (buf != NULL) PQfreemem(buf); - elog(ERROR, "SPOCK %s: could not read COPY data: %s", - MySubscription->name, - PQerrorMessage(applyconn)); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("SPOCK %s: could not read COPY data: %s", + MySubscription->name, + PQerrorMessage(applyconn)))); } else if (r < 0) { if (buf != NULL) PQfreemem(buf); - elog(ERROR, "SPOCK %s: invalid COPY status %d", - MySubscription->name, r); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("SPOCK %s: invalid COPY status %d", + MySubscription->name, r))); } else if (r == 0) { @@ -3331,9 +3369,56 @@ apply_work(PGconn *streamConn) edata = CopyErrorData(); /* - * use_try_block == true indicates either that an exception occurred - * during a DML operation, or that we were replaying previously failed - * actions (via need_replay). + * Connection-class errors must NOT enter the apply-side replay + * path (need_replay / use_try_block). Two reasons: + * + * 1. The replay path re-enters the wait loop with stale libpq + * state, producing an epoll_ctl(EINVAL) cascade on Linux. + * + * 2. With spock.exception_behaviour = transdiscard the replay + * path eventually logs the in-flight remote transaction's + * rows to spock.exception_log as if they had failed apply, + * when in fact they were never applied -- producing a + * "missing row" on the subscriber after reconnect. + * + * Re-throw instead. apply_work has the only PG_TRY in this call + * stack; the error propagates to the bgworker error handler, + * which aborts the current transaction (RecordTransactionAbort + * does not advance replorigin) and runs proc_exit. The + * before_shmem_exit callback spock_apply_worker_shmem_exit() + * (see ~line 2039) then clears + * replorigin_session_origin{,_lsn,_timestamp} so no later code + * path can advance the replication origin past the aborted + * in-flight remote transaction. The post-apply_work + * flush_progress_if_needed(true) at the bottom of + * spock_apply_main is also bypassed by the rethrow, which is + * what we want -- it would otherwise be the path that advances + * origin via RecordTransactionCommit. The manager respawns the + * worker, which resumes from the last durably-committed origin + * LSN and re-streams the aborted txn. + * + * Detect via sqlerrcode (preferred -- spock's own disconnect + * ereports are tagged ERRCODE_CONNECTION_FAILURE) with PQstatus + * as a fallback for libpq-internal raises (e.g. epoll_ctl) that + * don't tag. Apply-side errors (constraint violations and the + * like) do NOT take this branch and continue through the + * existing exception_log replay path below. + */ + if (edata->sqlerrcode == ERRCODE_CONNECTION_FAILURE || + edata->sqlerrcode == ERRCODE_CONNECTION_EXCEPTION || + edata->sqlerrcode == ERRCODE_CONNECTION_DOES_NOT_EXIST || + edata->sqlerrcode == ERRCODE_ADMIN_SHUTDOWN || + (applyconn != NULL && PQstatus(applyconn) == CONNECTION_BAD)) + { + elog(LOG, "SPOCK %s: connection error during apply, exiting via rethrow: %s", + MySubscription->name, edata->message); + PG_RE_THROW(); + } + + /* + * use_try_block == true indicates either: + * 1. An exception occurred during a DML operation, + * 2. Or we were replaying previously failed actions (via need_replay). * * If an exception occurs during handle_commit after prior handling, * we still need to ensure proper cleanup (e.g., disabling the diff --git a/tests/tap/t/101_clean_exit_on_conn_loss.pl b/tests/tap/t/101_clean_exit_on_conn_loss.pl new file mode 100644 index 00000000..0253effb --- /dev/null +++ b/tests/tap/t/101_clean_exit_on_conn_loss.pl @@ -0,0 +1,190 @@ +use strict; +use warnings; +use Test::More; +use lib '.'; +use SpockTest qw( + create_cluster destroy_cluster + system_maybe + get_test_config scalar_query psql_or_bail + wait_for_sub_status +); + +# ============================================================================= +# Test 101: Apply worker exits cleanly on connection loss (no replorigin leak) +# ============================================================================= +# +# Verifies the connection-error short-circuit added at the top of apply_work's +# PG_CATCH (src/spock_apply.c). When the upstream connection dies the worker +# must: +# +# 1. classify the error via sqlerrcode (ERRCODE_CONNECTION_FAILURE) or +# PQstatus(applyconn) == CONNECTION_BAD as a fallback, +# 2. clear replorigin_session_origin / _lsn / _timestamp directly (without +# calling replorigin_session_reset(), which would release the in-memory +# slot and leave session_replication_state == NULL, tripping an Assert in +# replorigin_session_advance on any subsequent commit path) so that +# spock_apply_main's on-exit flush_progress_if_needed(true) commit does +# not advance the replication origin to the stale in-flight end_lsn, +# 3. return cleanly from apply_work rather than goto'ing stream_replay with +# stale libpq state, +# 4. emit a single LOG line "connection error during apply, exiting via rethrow". +# +# Test 019 already proves the stale-fd / epoll_ctl absence; this test +# additionally asserts the new clean-exit log signature is present, which is +# the only positive proof that the new discriminator was engaged. Apply-side +# errors (constraint violations etc.) intentionally do not take this branch +# and continue to use the spock.exception_log replay path. +# +# Without the fix: +# - the LOG line "connection error during apply, exiting via rethrow" is absent +# - "error during exception handling" appears (replay path tries to +# re-enter stream_replay with stale libpq state) +# +# As in test 019 we SIGKILL the walsender so libpq sees a hard EOF without a +# CopyDone. SIGKILL also crashes the provider postmaster which restarts +# automatically; that gives us a realistic firewall-style disconnect. +# ============================================================================= + +create_cluster(2, 'Create 2-node cluster for clean-exit regression test'); + +my $config = get_test_config(); +my $node_ports = $config->{node_ports}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; +my $log_dir = $config->{log_dir}; + +my $p1 = $node_ports->[0]; # n1 — provider +my $p2 = $node_ports->[1]; # n2 — subscriber + +my $conn_n1 = "host=$host dbname=$dbname port=$p1 user=$db_user password=$db_password"; +my $pg_log_n2 = "$log_dir/00${p2}.log"; + +# Match the customer's environment (TRANSDISCARD). ALTER SYSTEM writes +# postgresql.auto.conf which takes precedence over SpockTest.pm defaults. +psql_or_bail(2, "ALTER SYSTEM SET spock.exception_behaviour = 'transdiscard'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +sleep(1); + +psql_or_bail(1, "CREATE TABLE test_clean_exit (id SERIAL PRIMARY KEY, val TEXT)"); +psql_or_bail(2, "CREATE TABLE test_clean_exit (id SERIAL PRIMARY KEY, val TEXT)"); + +psql_or_bail(2, + "SELECT spock.sub_create('sub_n1_n2', '$conn_n1', " . + "ARRAY['default', 'default_insert_only', 'ddl_sql'], false, false)"); + +ok(wait_for_sub_status(2, 'sub_n1_n2', 'replicating', 30), + 'sub_n1_n2 reaches replicating state'); + +# Drive a series of small commits so the apply worker has plenty of in-flight +# state when we kill the walsender. Each statement in psql is its own +# implicit transaction, which is what we want here. +my $batch = ''; +for my $i (1 .. 100) { + $batch .= "INSERT INTO test_clean_exit (val) VALUES ('pre_$i');\n"; +} +psql_or_bail(1, $batch); +sleep(2); + +my $pre_count = scalar_query(2, "SELECT count(*) FROM test_clean_exit"); +is($pre_count, '100', 'baseline 100 rows replicate n1->n2'); + +my $log_offset = -s $pg_log_n2 // 0; + +my $walsender_pid = scalar_query(1, + "SELECT pid FROM pg_stat_replication WHERE state = 'streaming' LIMIT 1"); + +my $signaled = 0; +if (defined $walsender_pid && $walsender_pid =~ /^\d+$/) { + $signaled = kill 9, int($walsender_pid); +} +BAIL_OUT("could not SIGKILL walsender PID " . ($walsender_pid // 'undef') . + " — apply worker may not have been streaming yet") + unless $signaled; +diag("SIGKILL sent to walsender PID $walsender_pid"); + +# 12 s covers the n2 PG_CATCH discriminator path + n1 crash recovery + n2 +# manager respawn. We do NOT want to wait until replication catches up here +# — we want to scrape the log for the proof-of-engagement LOG line first. +sleep(12); + +my $new_log = ''; +if (open(my $lf, '<', $pg_log_n2)) { + seek($lf, $log_offset, 0); + local $/; + $new_log = <$lf> // ''; + close($lf); +} + +# Primary assertion: the discriminator was engaged. This message is emitted +# by the new connection-error branch in PG_CATCH and exists only with the +# fix in place. Without the fix, the worker exits via the existing +# goto-stream_replay path and never logs this string. +my $clean_exit = + ($new_log =~ /connection error during apply, exiting via rethrow/) ? 1 : 0; +ok($clean_exit, + 'n2 log shows "connection error during apply, exiting via rethrow" (fix engaged)'); +if (!$clean_exit) { + my @recent = grep { /SPOCK|connection|epoll|exception/ } + split /\n/, $new_log; + diag("FAIL: clean-exit log line missing — discriminator did not fire"); + diag("Recent SPOCK/connection log lines on n2:"); + diag(" $_") for @recent; +} + +# Secondary cross-platform assertion: the second-exception cascade ("error +# during exception handling") must not appear, because the discriminator +# prevents re-entry into stream_replay with stale libpq state. This +# overlaps with test 019 but is included so this test is self-contained. +my $eeh = ($new_log =~ /error during exception handling/) ? 1 : 0; +ok(!$eeh, 'no "error during exception handling" after connection loss'); + +# Linux-specific: the stale-fd path produces epoll_ctl(EINVAL). Absent with +# the fix. +SKIP: { + skip 'epoll_ctl error is Linux-specific', 1 unless $^O eq 'linux'; + my $epoll_err = ($new_log =~ /epoll_ctl\(\) failed/) ? 1 : 0; + ok(!$epoll_err, 'Linux: no epoll_ctl() error after connection loss'); +} + +ok(wait_for_sub_status(2, 'sub_n1_n2', 'replicating', 30), + 'sub_n1_n2 returns to replicating state after reconnect'); + +# Drive more rows and confirm row counts match — sanity check that no +# transaction was silently skipped on either side of the disconnect. +my $batch2 = ''; +for my $i (101 .. 150) { + $batch2 .= "INSERT INTO test_clean_exit (val) VALUES ('post_$i');\n"; +} +psql_or_bail(1, $batch2); + +# Poll for catch-up rather than fixed sleep. +my $caught_up = 0; +my $n1_count; +my $n2_count; +for my $attempt (1 .. 30) { + $n1_count = scalar_query(1, "SELECT count(*) FROM test_clean_exit"); + $n2_count = scalar_query(2, "SELECT count(*) FROM test_clean_exit"); + if (defined $n1_count && defined $n2_count && $n1_count eq $n2_count) { + $caught_up = 1; + last; + } + sleep(1); +} + +ok($caught_up, + "subscriber caught up to publisher row count (n1=$n1_count, n2=$n2_count)"); + +my $n1_max = scalar_query(1, "SELECT coalesce(max(id), 0) FROM test_clean_exit"); +my $n2_max = scalar_query(2, "SELECT coalesce(max(id), 0) FROM test_clean_exit"); +is($n2_max, $n1_max, + "subscriber max(id) matches publisher (n1=$n1_max, n2=$n2_max)"); + +system_maybe("$pg_bin/psql", '-h', $host, '-p', $p2, '-U', $db_user, '-d', $dbname, + '-c', "SELECT spock.sub_drop('sub_n1_n2')"); + +destroy_cluster('Destroy cluster after clean-exit regression test'); + +done_testing(); diff --git a/tests/tap/t/102_no_origin_leak_on_disconnect.pl b/tests/tap/t/102_no_origin_leak_on_disconnect.pl new file mode 100644 index 00000000..a36f518b --- /dev/null +++ b/tests/tap/t/102_no_origin_leak_on_disconnect.pl @@ -0,0 +1,195 @@ +use strict; +use warnings; +use Test::More; +use lib '.'; +use SpockTest qw( + create_cluster destroy_cluster + system_maybe + get_test_config scalar_query psql_or_bail + wait_for_sub_status +); + +# ============================================================================= +# Test 102: apply_delay + walsender SIGKILL — no assertion crash, no row loss +# ============================================================================= +# +# Companion to test 101. Test 101 verifies the discriminator branch in +# apply_work's PG_CATCH was engaged (via the unique "connection error during +# apply, exiting via rethrow" log line) when the worker is idle in +# WaitLatchOrSocket. Test 102 covers a different shape: the worker is +# inside handle_begin's apply_delay pg_usleep when the upstream connection +# is severed. This was the path where an earlier version of the fix +# (calling replorigin_session_reset() to detach the session origin slot) +# tripped an "Assert(session_replication_state != NULL)" in +# replorigin_session_advance during spock_apply_main's on-exit +# flush_progress_if_needed(true) commit -- because update_progress_entry +# carries a stale session_origin_lsn into RecordTransactionCommit. +# +# The current fix (in spock_apply.c) clears the session origin variables +# directly without releasing the in-memory slot, mirroring spock's existing +# spock_apply_worker_shmem_exit pattern. This both blocks the leak via +# RecordTransactionAbort/Commit advance gate AND avoids the dangling slot +# pointer. This test catches a regression to either issue. +# +# What this test does verify: +# - SIGKILL of the walsender during the apply worker's pg_usleep does +# not crash the apply worker with an assertion failure +# - The "connection error during apply, exiting via rethrow" log line fires +# - The subscription recovers cleanly back to "replicating" +# - All rows committed on the publisher (including the ones that were +# in-flight at SIGKILL time) end up on the subscriber +# +# What this test does NOT verify: +# - The leak invariant in a true mid-DML scenario. apply_delay's +# pg_usleep delays disconnect detection until AFTER handle_commit has +# completed legitimately, so the apply worker is between transactions +# when PG_CATCH fires -- there is no in-flight remote transaction +# whose final_lsn could leak through. Reproducing the true mid-DML +# scenario deterministically requires injection points which the +# v5_STABLE branch does not have. The leak-prevention invariant is +# preserved by code analogy with PG core's start_apply PG_CATCH +# (src/backend/replication/logical/worker.c:4452). +# ============================================================================= + +create_cluster(2, 'Create 2-node cluster for apply_delay disconnect test'); + +my $config = get_test_config(); +my $node_ports = $config->{node_ports}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; +my $log_dir = $config->{log_dir}; + +my $p1 = $node_ports->[0]; +my $p2 = $node_ports->[1]; + +my $conn_n1 = "host=$host dbname=$dbname port=$p1 user=$db_user password=$db_password"; +my $pg_log_n2 = "$log_dir/00${p2}.log"; + +psql_or_bail(2, "ALTER SYSTEM SET spock.exception_behaviour = 'transdiscard'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +sleep(1); + +psql_or_bail(1, "CREATE TABLE test_origin_leak (id SERIAL PRIMARY KEY, val TEXT)"); +psql_or_bail(2, "CREATE TABLE test_origin_leak (id SERIAL PRIMARY KEY, val TEXT)"); + +# 15s apply_delay: long enough to reliably catch the worker inside +# handle_begin's pg_usleep when we SIGKILL the walsender. +psql_or_bail(2, + "SELECT spock.sub_create(" . + "subscription_name => 'sub_n1_n2', " . + "provider_dsn => '$conn_n1', " . + "replication_sets => ARRAY['default', 'default_insert_only', 'ddl_sql'], " . + "synchronize_structure => false, " . + "synchronize_data => false, " . + "apply_delay => '15 seconds'::interval)"); + +ok(wait_for_sub_status(2, 'sub_n1_n2', 'replicating', 30), + 'sub_n1_n2 reaches replicating state'); + +# Baseline: 5 rows. apply_delay=15s so each commit takes 15s+ to land. +my $batch = ''; +$batch .= "INSERT INTO test_origin_leak (val) VALUES ('pre_$_');\n" for 1 .. 5; +psql_or_bail(1, $batch); + +my $baseline_count = 0; +for (1 .. 40) { + $baseline_count = scalar_query(2, "SELECT count(*) FROM test_origin_leak"); + last if defined $baseline_count && $baseline_count eq '5'; + sleep(1); +} +is($baseline_count, '5', 'baseline 5 rows replicate n1->n2 (after apply_delay)'); + +# Insert T_inflight on the publisher. We will SIGKILL the walsender while +# the apply worker is in pg_usleep waiting on this transaction's +# apply_delay. +psql_or_bail(1, "INSERT INTO test_origin_leak (val) VALUES ('inflight_critical')"); + +# Wait so n2's apply worker has received T_inflight and entered its +# apply_delay pg_usleep. +sleep(3); + +# Snapshot subscriber log offset BEFORE SIGKILL so we can scrape only the +# new lines. +my $log_offset = -s $pg_log_n2 // 0; + +my $walsender_pid = scalar_query(1, + "SELECT pid FROM pg_stat_replication WHERE state = 'streaming' LIMIT 1"); +BAIL_OUT("no streaming walsender to SIGKILL — apply worker not yet attached") + unless defined $walsender_pid && $walsender_pid =~ /^\d+$/; + +my $signaled = kill 9, int($walsender_pid); +BAIL_OUT("could not SIGKILL walsender PID $walsender_pid") unless $signaled; +diag("SIGKILL sent to walsender PID $walsender_pid (subscriber is mid-pg_usleep)"); + +# 18 s: covers worker wake (12s remaining of pg_usleep) + handle_commit +# of the in-flight txn + connection-failure detection + clean exit + +# manager respawn + new worker entering its own apply_delay sleep. +sleep(18); + +my $new_log = ''; +if (open(my $lf, '<', $pg_log_n2)) { + seek($lf, $log_offset, 0); + local $/; + $new_log = <$lf> // ''; + close($lf); +} + +# (1) The discriminator branch fired — this is the unique positive proof. +my $clean_exit = + ($new_log =~ /connection error during apply, exiting via rethrow/) ? 1 : 0; +ok($clean_exit, 'apply worker logged "connection error during apply, exiting via rethrow"'); + +# (2) No assertion crash from replorigin_session_advance. An earlier +# iteration of the fix called replorigin_session_reset() which detached +# the in-memory slot pointer, leaving session_replication_state == NULL +# while replorigin_session_origin and update_progress_entry were still +# carrying the stale state into the on-exit catalog commit -- triggering +# Assert("session_replication_state != NULL"). Catch any regression. +my $assert_failed = + ($new_log =~ /Assert.*session_replication_state.*!= NULL/) ? 1 : 0; +ok(!$assert_failed, + 'no Assert(session_replication_state != NULL) failure on apply worker exit'); + +my $aborted_signal_6 = + ($new_log =~ /spock apply.*was terminated by signal 6: Aborted/) ? 1 : 0; +ok(!$aborted_signal_6, + 'no signal 6 (SIGABRT) termination on apply worker exit'); + +# (3) Subscription recovers and forward progress resumes. +ok(wait_for_sub_status(2, 'sub_n1_n2', 'replicating', 60), + 'sub_n1_n2 returns to replicating state after reconnect'); + +psql_or_bail(1, "INSERT INTO test_origin_leak (val) VALUES ('post_recovery')"); + +# (4) All rows match — including T_inflight which was in-flight at the +# moment of SIGKILL. If progress had been incorrectly advanced past +# T_inflight, the new worker would have skipped it on START_REPLICATION +# and the count would be short by 1. +my $caught_up = 0; +my ($n1_count, $n2_count); +for (1 .. 60) { + $n1_count = scalar_query(1, "SELECT count(*) FROM test_origin_leak"); + $n2_count = scalar_query(2, "SELECT count(*) FROM test_origin_leak"); + if (defined $n1_count && defined $n2_count && $n1_count eq $n2_count) { + $caught_up = 1; + last; + } + sleep(1); +} +ok($caught_up, + "subscriber caught up to publisher (n1=$n1_count, n2=$n2_count)"); + +is(scalar_query(2, + "SELECT count(*) FROM test_origin_leak WHERE val = 'inflight_critical'"), + '1', + 'in-flight transaction is durably present on subscriber after recovery'); + +system_maybe("$pg_bin/psql", '-h', $host, '-p', $p2, '-U', $db_user, '-d', $dbname, + '-c', "SELECT spock.sub_drop('sub_n1_n2')"); + +destroy_cluster('Destroy cluster after apply_delay disconnect test'); + +done_testing();