Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 106 additions & 21 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -1922,10 +1922,20 @@ static void
spock_apply_worker_shmem_exit(int code, Datum arg)
{
/*
* Reset replication session to avoid reuse after an error. This is done
* in a before_shmem_exit callback instead of on_proc_exit because the
* backend may also clean up the origin in certain cases, and we want to
* avoid duplicate cleanup.
* Reset replication session to avoid reuse after an error.
* This is done in a before_shmem_exit callback instead of
* on_proc_exit because the backend may also clean up the origin
* in certain cases, and we want to avoid duplicate cleanup.
*
* Ordering matters: this must run before ShutdownPostgres()
* (also a before_shmem_exit, registered earlier during
* InitPostgres) so the origin is Invalid by the time
* ShutdownPostgres calls AbortOutOfAnyTransaction(). Otherwise
* RecordTransactionAbort advances the origin to the in-flight
* transaction's stale final_lsn, silently skipping that txn on
* reconnect. LIFO callback order makes this work. The
* connection-error rethrow path in apply_work's PG_CATCH
* relies on it.
*/
replorigin_session_origin = InvalidRepOriginId;
replorigin_session_origin_lsn = InvalidXLogRecPtr;
Expand Down Expand Up @@ -2969,11 +2979,31 @@ apply_work(PGconn *streamConn)
MySpockWorker->worker_status = SPOCK_WORKER_STATUS_RUNNING;

/*
* Background workers mustn't call usleep() or any direct
* equivalent instead, they may wait on their process latch, which
* sleeps as necessary, but is awakened if postmaster dies. That
* way the background process goes away immediately in an
* emergency.
* Refresh fd at the top of every iteration. On the previous
* iteration, PQconsumeInput may have detected a dead socket and
* caused libpq to close it (and possibly reset conn->sock to
* PGINVALID_SOCKET). Reading PQsocket() again here ensures we
* pass libpq's current value to WaitLatchOrSocket -- never a
* stale fd, which would otherwise cause epoll_ctl(EINVAL) on
* Linux. If the connection has gone bad, raise a tagged error
* so the PG_CATCH discriminator routes us to a clean exit.
*/
fd = PQsocket(applyconn);
if (PQstatus(applyconn) == CONNECTION_BAD ||
fd == PGINVALID_SOCKET)
{
MySpockWorker->worker_status = SPOCK_WORKER_STATUS_STOPPED;
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("SPOCK %s: connection to other side has died",
MySubscription->name)));
}

/*
* Background workers mustn't call usleep() or any direct equivalent
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
rc = WaitLatchOrSocket(&MyProc->procLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
Expand Down Expand Up @@ -3004,8 +3034,10 @@ apply_work(PGconn *streamConn)
if (PQstatus(applyconn) == CONNECTION_BAD)
{
MySpockWorker->worker_status = SPOCK_WORKER_STATUS_STOPPED;
elog(ERROR, "SPOCK %s: connection to other side has died",
MySubscription->name);
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("SPOCK %s: connection to other side has died",
MySubscription->name)));
}

/*
Expand Down Expand Up @@ -3112,23 +3144,29 @@ apply_work(PGconn *streamConn)
{
if (buf != NULL)
PQfreemem(buf);
elog(ERROR, "SPOCK %s: data stream ended",
MySubscription->name);
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("SPOCK %s: data stream ended",
MySubscription->name)));
}
else if (r == -2)
{
if (buf != NULL)
PQfreemem(buf);
elog(ERROR, "SPOCK %s: could not read COPY data: %s",
MySubscription->name,
PQerrorMessage(applyconn));
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("SPOCK %s: could not read COPY data: %s",
MySubscription->name,
PQerrorMessage(applyconn))));
}
else if (r < 0)
{
if (buf != NULL)
PQfreemem(buf);
elog(ERROR, "SPOCK %s: invalid COPY status %d",
MySubscription->name, r);
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("SPOCK %s: invalid COPY status %d",
MySubscription->name, r)));
}
else if (r == 0)
{
Expand Down Expand Up @@ -3331,9 +3369,56 @@ apply_work(PGconn *streamConn)
edata = CopyErrorData();

/*
* use_try_block == true indicates either that an exception occurred
* during a DML operation, or that we were replaying previously failed
* actions (via need_replay).
* Connection-class errors must NOT enter the apply-side replay
* path (need_replay / use_try_block). Two reasons:
*
* 1. The replay path re-enters the wait loop with stale libpq
* state, producing an epoll_ctl(EINVAL) cascade on Linux.
*
* 2. With spock.exception_behaviour = transdiscard the replay
* path eventually logs the in-flight remote transaction's
* rows to spock.exception_log as if they had failed apply,
* when in fact they were never applied -- producing a
* "missing row" on the subscriber after reconnect.
*
* Re-throw instead. apply_work has the only PG_TRY in this call
* stack; the error propagates to the bgworker error handler,
* which aborts the current transaction (RecordTransactionAbort
* does not advance replorigin) and runs proc_exit. The
* before_shmem_exit callback spock_apply_worker_shmem_exit()
* (see ~line 2039) then clears
* replorigin_session_origin{,_lsn,_timestamp} so no later code
* path can advance the replication origin past the aborted
* in-flight remote transaction. The post-apply_work
* flush_progress_if_needed(true) at the bottom of
* spock_apply_main is also bypassed by the rethrow, which is
* what we want -- it would otherwise be the path that advances
* origin via RecordTransactionCommit. The manager respawns the
* worker, which resumes from the last durably-committed origin
* LSN and re-streams the aborted txn.
*
* Detect via sqlerrcode (preferred -- spock's own disconnect
* ereports are tagged ERRCODE_CONNECTION_FAILURE) with PQstatus
* as a fallback for libpq-internal raises (e.g. epoll_ctl) that
* don't tag. Apply-side errors (constraint violations and the
* like) do NOT take this branch and continue through the
* existing exception_log replay path below.
*/
if (edata->sqlerrcode == ERRCODE_CONNECTION_FAILURE ||
edata->sqlerrcode == ERRCODE_CONNECTION_EXCEPTION ||
edata->sqlerrcode == ERRCODE_CONNECTION_DOES_NOT_EXIST ||
edata->sqlerrcode == ERRCODE_ADMIN_SHUTDOWN ||
(applyconn != NULL && PQstatus(applyconn) == CONNECTION_BAD))
{
elog(LOG, "SPOCK %s: connection error during apply, exiting via rethrow: %s",
MySubscription->name, edata->message);
PG_RE_THROW();
}

/*
* use_try_block == true indicates either:
* 1. An exception occurred during a DML operation,
* 2. Or we were replaying previously failed actions (via need_replay).
*
* If an exception occurs during handle_commit after prior handling,
* we still need to ensure proper cleanup (e.g., disabling the
Expand Down
190 changes: 190 additions & 0 deletions tests/tap/t/101_clean_exit_on_conn_loss.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use strict;
use warnings;
use Test::More;
use lib '.';
use SpockTest qw(
create_cluster destroy_cluster
system_maybe
get_test_config scalar_query psql_or_bail
wait_for_sub_status
);

# =============================================================================
# Test 101: Apply worker exits cleanly on connection loss (no replorigin leak)
# =============================================================================
#
# Verifies the connection-error short-circuit added at the top of apply_work's
# PG_CATCH (src/spock_apply.c). When the upstream connection dies the worker
# must:
#
# 1. classify the error via sqlerrcode (ERRCODE_CONNECTION_FAILURE) or
# PQstatus(applyconn) == CONNECTION_BAD as a fallback,
# 2. clear replorigin_session_origin / _lsn / _timestamp directly (without
# calling replorigin_session_reset(), which would release the in-memory
# slot and leave session_replication_state == NULL, tripping an Assert in
# replorigin_session_advance on any subsequent commit path) so that
# spock_apply_main's on-exit flush_progress_if_needed(true) commit does
# not advance the replication origin to the stale in-flight end_lsn,
# 3. return cleanly from apply_work rather than goto'ing stream_replay with
# stale libpq state,
# 4. emit a single LOG line "connection error during apply, exiting via rethrow".
#
# Test 019 already proves the stale-fd / epoll_ctl absence; this test
# additionally asserts the new clean-exit log signature is present, which is
# the only positive proof that the new discriminator was engaged. Apply-side
# errors (constraint violations etc.) intentionally do not take this branch
# and continue to use the spock.exception_log replay path.
#
# Without the fix:
# - the LOG line "connection error during apply, exiting via rethrow" is absent
# - "error during exception handling" appears (replay path tries to
# re-enter stream_replay with stale libpq state)
#
# As in test 019 we SIGKILL the walsender so libpq sees a hard EOF without a
# CopyDone. SIGKILL also crashes the provider postmaster which restarts
# automatically; that gives us a realistic firewall-style disconnect.
# =============================================================================

create_cluster(2, 'Create 2-node cluster for clean-exit regression test');

my $config = get_test_config();
my $node_ports = $config->{node_ports};
my $host = $config->{host};
my $dbname = $config->{db_name};
my $db_user = $config->{db_user};
my $db_password = $config->{db_password};
my $pg_bin = $config->{pg_bin};
my $log_dir = $config->{log_dir};

my $p1 = $node_ports->[0]; # n1 — provider
my $p2 = $node_ports->[1]; # n2 — subscriber

my $conn_n1 = "host=$host dbname=$dbname port=$p1 user=$db_user password=$db_password";
my $pg_log_n2 = "$log_dir/00${p2}.log";

# Match the customer's environment (TRANSDISCARD). ALTER SYSTEM writes
# postgresql.auto.conf which takes precedence over SpockTest.pm defaults.
psql_or_bail(2, "ALTER SYSTEM SET spock.exception_behaviour = 'transdiscard'");
psql_or_bail(2, "SELECT pg_reload_conf()");
sleep(1);

psql_or_bail(1, "CREATE TABLE test_clean_exit (id SERIAL PRIMARY KEY, val TEXT)");
psql_or_bail(2, "CREATE TABLE test_clean_exit (id SERIAL PRIMARY KEY, val TEXT)");

psql_or_bail(2,
"SELECT spock.sub_create('sub_n1_n2', '$conn_n1', " .
"ARRAY['default', 'default_insert_only', 'ddl_sql'], false, false)");

ok(wait_for_sub_status(2, 'sub_n1_n2', 'replicating', 30),
'sub_n1_n2 reaches replicating state');

# Drive a series of small commits so the apply worker has plenty of in-flight
# state when we kill the walsender. Each statement in psql is its own
# implicit transaction, which is what we want here.
my $batch = '';
for my $i (1 .. 100) {
$batch .= "INSERT INTO test_clean_exit (val) VALUES ('pre_$i');\n";
}
psql_or_bail(1, $batch);
Comment thread
mason-sharp marked this conversation as resolved.
sleep(2);

my $pre_count = scalar_query(2, "SELECT count(*) FROM test_clean_exit");
is($pre_count, '100', 'baseline 100 rows replicate n1->n2');

my $log_offset = -s $pg_log_n2 // 0;

my $walsender_pid = scalar_query(1,
"SELECT pid FROM pg_stat_replication WHERE state = 'streaming' LIMIT 1");

my $signaled = 0;
if (defined $walsender_pid && $walsender_pid =~ /^\d+$/) {
$signaled = kill 9, int($walsender_pid);
}
BAIL_OUT("could not SIGKILL walsender PID " . ($walsender_pid // 'undef') .
" — apply worker may not have been streaming yet")
unless $signaled;
diag("SIGKILL sent to walsender PID $walsender_pid");

# 12 s covers the n2 PG_CATCH discriminator path + n1 crash recovery + n2
# manager respawn. We do NOT want to wait until replication catches up here
# — we want to scrape the log for the proof-of-engagement LOG line first.
sleep(12);

my $new_log = '';
if (open(my $lf, '<', $pg_log_n2)) {
seek($lf, $log_offset, 0);
local $/;
$new_log = <$lf> // '';
close($lf);
}

# Primary assertion: the discriminator was engaged. This message is emitted
# by the new connection-error branch in PG_CATCH and exists only with the
# fix in place. Without the fix, the worker exits via the existing
# goto-stream_replay path and never logs this string.
my $clean_exit =
($new_log =~ /connection error during apply, exiting via rethrow/) ? 1 : 0;
ok($clean_exit,
'n2 log shows "connection error during apply, exiting via rethrow" (fix engaged)');
if (!$clean_exit) {
my @recent = grep { /SPOCK|connection|epoll|exception/ }
split /\n/, $new_log;
diag("FAIL: clean-exit log line missing — discriminator did not fire");
diag("Recent SPOCK/connection log lines on n2:");
diag(" $_") for @recent;
}

# Secondary cross-platform assertion: the second-exception cascade ("error
# during exception handling") must not appear, because the discriminator
# prevents re-entry into stream_replay with stale libpq state. This
# overlaps with test 019 but is included so this test is self-contained.
my $eeh = ($new_log =~ /error during exception handling/) ? 1 : 0;
ok(!$eeh, 'no "error during exception handling" after connection loss');

# Linux-specific: the stale-fd path produces epoll_ctl(EINVAL). Absent with
# the fix.
SKIP: {
skip 'epoll_ctl error is Linux-specific', 1 unless $^O eq 'linux';
my $epoll_err = ($new_log =~ /epoll_ctl\(\) failed/) ? 1 : 0;
ok(!$epoll_err, 'Linux: no epoll_ctl() error after connection loss');
}

ok(wait_for_sub_status(2, 'sub_n1_n2', 'replicating', 30),
'sub_n1_n2 returns to replicating state after reconnect');

# Drive more rows and confirm row counts match — sanity check that no
# transaction was silently skipped on either side of the disconnect.
my $batch2 = '';
for my $i (101 .. 150) {
$batch2 .= "INSERT INTO test_clean_exit (val) VALUES ('post_$i');\n";
}
psql_or_bail(1, $batch2);

# Poll for catch-up rather than fixed sleep.
my $caught_up = 0;
my $n1_count;
my $n2_count;
for my $attempt (1 .. 30) {
$n1_count = scalar_query(1, "SELECT count(*) FROM test_clean_exit");
$n2_count = scalar_query(2, "SELECT count(*) FROM test_clean_exit");
if (defined $n1_count && defined $n2_count && $n1_count eq $n2_count) {
$caught_up = 1;
last;
}
sleep(1);
}

ok($caught_up,
"subscriber caught up to publisher row count (n1=$n1_count, n2=$n2_count)");

my $n1_max = scalar_query(1, "SELECT coalesce(max(id), 0) FROM test_clean_exit");
my $n2_max = scalar_query(2, "SELECT coalesce(max(id), 0) FROM test_clean_exit");
is($n2_max, $n1_max,
"subscriber max(id) matches publisher (n1=$n1_max, n2=$n2_max)");

system_maybe("$pg_bin/psql", '-h', $host, '-p', $p2, '-U', $db_user, '-d', $dbname,
'-c', "SELECT spock.sub_drop('sub_n1_n2')");

destroy_cluster('Destroy cluster after clean-exit regression test');

done_testing();
Loading
Loading