Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,026 changes: 1,026 additions & 0 deletions docs/internals-doc/plans/2026-04-30-recover-remote-commit-ts-plan.md

Large diffs are not rendered by default.

262 changes: 262 additions & 0 deletions docs/internals-doc/specs/spock-progress-no-checkpoint-hook-design.md

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions include/spock_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
#include "storage/spin.h"
#include "utils/hsearch.h"

#include "spock_rmgr.h"

extern HTAB *SpockGroupHash;

/* numeric version to store on disk */
Expand Down Expand Up @@ -159,8 +157,16 @@ extern void spock_group_progress_update_ptr(SpockGroupEntry *entry,
extern TimestampTz apply_worker_get_prev_remote_ts(void);

extern void spock_group_resource_dump(void);
extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags);
extern void spock_group_progress_update_list(List *lst);
extern void spock_group_progress_force_set_list(List *lst);

/*
* Reset every non-key field of a SpockApplyProgress to its "not set" value.
* Single source of truth for initial-state semantics; callers that need to
* (re)seed a progress entry from somewhere other than the hash insert path
* (e.g. reconcile_progress_with_origin) call this directly so the two paths
* cannot drift when SpockApplyProgress gains a new field.
*/
extern void spock_init_progress_fields(SpockApplyProgress *progress);

#endif /* SPOCK_GROUP_H */
18 changes: 18 additions & 0 deletions include/spock_progress_recovery.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*-------------------------------------------------------------------------
*
* spock_progress_recovery.h
* apply-worker startup state setup for spock.progress
*
* Copyright (c) 2022-2026, pgEdge, Inc.
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef SPOCK_PROGRESS_RECOVERY_H
#define SPOCK_PROGRESS_RECOVERY_H

#include "access/xlog.h"

extern void spock_init_progress_state(XLogRecPtr origin_lsn);

#endif /* SPOCK_PROGRESS_RECOVERY_H */
64 changes: 44 additions & 20 deletions include/spock_rmgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
* spock_rmgr.h
* spock resource manager declarations
*
* Emits one WAL record per SpockGroupHash entry at every resource.dat dump
* event (clean shutdown, add_node post-loop, table-sync post-loop) carrying
* the full SpockApplyProgress snapshot for that entry. The records are not
* required for state recovery (shmem is reseeded from resource.dat plus
* replorigin reconcile plus pg_commit_ts scan); they exist so that a
* pg_waldump trace shows the exact progress snapshot that was persisted
* at each dump LSN -- useful for incident reconstruction over time.
*
* Copyright (c) 2022-2026, pgEdge, Inc.
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, The Regents of the University of California
Expand All @@ -14,38 +22,54 @@

#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "nodes/pg_list.h"

#include "spock_group.h"

/* Spock resouce manager */
#define SPOCK_RMGR_NAME "spock_custom_rmgr"
#define SPOCK_RMGR_ID 144
#define SPOCK_RMGR_NAME "spock_custom_rmgr"
#define SPOCK_RMGR_ID 144

/* Spock RMGR tags. */
#define SPOCK_RMGR_APPLY_PROGRESS 0x10
#define SPOCK_RMGR_SUBTRANS_COMMIT_TS 0x20
/* Spock RMGR record types (high nibble of info byte) */
#define SPOCK_RMGR_RESOURCE_DUMP 0x10

typedef struct SpockApplyProgress SpockApplyProgress;
typedef struct SpockGroupEntry SpockGroupEntry;
/* Event type within a SPOCK_RMGR_RESOURCE_DUMP record */
typedef enum SpockResourceDumpEvent
{
SPOCK_DUMP_SHUTDOWN = 0,
SPOCK_DUMP_ADD_NODE = 1,
SPOCK_DUMP_TABLE_SYNC = 2
} SpockResourceDumpEvent;

#if 0
typedef struct SubTransactionCommitTsEntry
typedef struct SpockResourceDumpRec
{
TransactionId xid;
TimestampTz time;
RepOriginId nodeid;
} SubTransactionCommitTsEntry;
#endif
uint8 event_type; /* SpockResourceDumpEvent */
uint8 flags; /* reserved */
uint16 entry_seq; /* 0-based index within this dump event */
uint16 entry_total; /* total entries in this dump event */
uint16 _pad;
SpockApplyProgress progress; /* full snapshot for this entry */
Comment thread
mason-sharp marked this conversation as resolved.
} SpockResourceDumpRec;

/* RMGR function declarations */
extern void spock_rmgr_init(void);
extern void spock_rmgr_redo(XLogReaderState *record);
extern void spock_rmgr_desc(StringInfo buf, XLogReaderState *record);
extern const char *spock_rmgr_identify(uint8 info);
extern void spock_rmgr_redo(XLogReaderState *record);
extern void spock_rmgr_startup(void);
extern void spock_rmgr_cleanup(void);

/* WAL helpers */
extern XLogRecPtr spock_apply_progress_add_to_wal(const SpockApplyProgress *sap);
/*
* Emit helper -- called from the dump call sites in spock_group.c /
* spock_shmem.c. Emits one WAL record per progress entry, then issues a
* single XLogFlush at the end.
*
* If `changed_entries` is NULL, walks SpockGroupHash and emits a record
* for each entry (used by SHUTDOWN and ADD_NODE — both reflect changes
* across all subscriptions).
*
* If `changed_entries` is non-NULL, it is treated as a List of
* SpockApplyProgress* and emits one record per list element (used by
* TABLE_SYNC, which only affects one subscription's progress).
*/
extern void spock_rmgr_log_resource_dump(SpockResourceDumpEvent event,
List *changed_entries);

#endif /* SPOCK_RMGR_H */
1 change: 1 addition & 0 deletions include/spock_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ extern SpockWorker *spock_sync_find(Oid dboid, Oid subid,
extern List *spock_sync_find_all(Oid dboid, Oid subscriberid);

extern SpockWorker *spock_get_worker(int slot);
extern bool spock_any_apply_worker_running(void);
extern bool spock_worker_running(SpockWorker *w);
extern bool spock_worker_terminating(SpockWorker *w);
extern void spock_worker_kill(SpockWorker *worker);
Expand Down
50 changes: 46 additions & 4 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "spock_conflict_stat.h"
#endif
#include "spock_executor.h"
#include "spock_group.h"
#include "spock_node.h"
#include "spock_conflict.h"
#include "spock_rmgr.h"
Expand Down Expand Up @@ -150,7 +151,6 @@ int log_origin_change = SPOCK_ORIGIN_NONE;
int spock_apply_idle_timeout = 300;

static emit_log_hook_type prev_emit_log_hook = NULL;
static Checkpoint_hook_type prev_Checkpoint_hook = NULL;

void _PG_init(void);
PGDLLEXPORT void spock_supervisor_main(Datum main_arg);
Expand Down Expand Up @@ -693,6 +693,49 @@ start_manager_workers(void)
table_close(rel, AccessShareLock);
}

/*
* Drain interval and timeout for the supervisor's shutdown wait loop.
*
* On SIGTERM, postmaster signals all bgworkers in parallel; the supervisor
* waits for sibling APPLY/SYNC workers to clear their PGPROC slots so the
* SHUTDOWN forensic dump reflects each worker's last update to
* SpockGroupHash. 100 ms × 50 = 5 s ceiling — long enough to cover normal
* apply-side cleanup, short enough to avoid blocking postmaster shutdown
* on a stuck worker.
*/
#define SPOCK_SHUTDOWN_DRAIN_INTERVAL_US 100000
#define SPOCK_SHUTDOWN_DRAIN_MAX_RETRIES 50

/*
* Supervisor before_shmem_exit cleanup. Runs in the supervisor process on
* clean shutdown, with WAL insertion still permitted.
*
* We poll-wait for in-flight apply/sync workers to detach so the resource.dat
* file dump and the SPOCK_DUMP_SHUTDOWN forensic WAL records reflect each
* worker's final SpockGroupHash state.
*/
static void
spock_supervisor_on_exit(int code, Datum arg)
{
int retries = SPOCK_SHUTDOWN_DRAIN_MAX_RETRIES;

if (code != 0)
return;

while (retries-- > 0 && spock_any_apply_worker_running())
{
CHECK_FOR_INTERRUPTS();
pg_usleep(SPOCK_SHUTDOWN_DRAIN_INTERVAL_US);
}

if (spock_any_apply_worker_running())
elog(LOG, "spock supervisor: shutdown drain timed out, "
"SHUTDOWN forensic snapshot may be incomplete");

spock_group_resource_dump();
spock_rmgr_log_resource_dump(SPOCK_DUMP_SHUTDOWN, NULL);
}

/*
* Static bgworker used for initialization and management (our main process).
*/
Expand All @@ -703,6 +746,8 @@ spock_supervisor_main(Datum main_arg)
pqsignal(SIGTERM, handle_sigterm);
BackgroundWorkerUnblockSignals();

before_shmem_exit(spock_supervisor_on_exit, (Datum) 0);

/*
* Initialize supervisor info in shared memory. Strictly speaking we
* don't need a lock here, because no other process could possibly be
Expand Down Expand Up @@ -1244,9 +1289,6 @@ _PG_init(void)
if (IsBinaryUpgrade)
return;

prev_Checkpoint_hook = Checkpoint_hook;
Checkpoint_hook = spock_checkpoint_hook;

/* Spock resource manager */
spock_rmgr_init();

Expand Down
87 changes: 73 additions & 14 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "spock_conflict.h"
#include "spock_executor.h"
#include "spock_node.h"
#include "spock_progress_recovery.h"
#include "spock_proto_native.h"
#include "spock_queue.h"
#include "spock_relcache.h"
Expand Down Expand Up @@ -253,6 +254,7 @@ static bool apply_replay_queue_append_entry(ApplyReplayEntry **entry_p,
static void apply_replay_queue_start_replay(void);
static void apply_replay_spill_write_entry(int len, char *data);
static ApplyReplayEntry *apply_replay_spill_read_entry(void);
static void request_initial_status_update(PGconn *conn, XLogRecPtr startpos);

/* Wrapper for latch for waiting for previous transaction to commit */
void
Expand Down Expand Up @@ -1005,10 +1007,11 @@ handle_commit(StringInfo s)
.remote_commit_lsn = end_lsn,
.received_lsn = end_lsn,
/*
* Include remote_insert_lsn for WAL persistence. This was already
* updated in shmem by UpdateWorkerStats() earlier (either from
* apply_work for protocol 5+, or from handle_commit for protocol 4).
* Without this, crash recovery would lose remote_insert_lsn.
* Carry forward the remote_insert_lsn already in shmem (set by
* UpdateWorkerStats on the most-recent keepalive or 'w' message).
* This keeps the shmem entry coherent after the update below;
* crash recovery of this field is handled by the forced keepalive
* sent right after spock_start_replication, not by any WAL record.
*/
.remote_insert_lsn = MyApplyWorker->apply_group->progress.remote_insert_lsn,
/* XXX: Could we use commit_ts value instead? */
Expand All @@ -1019,9 +1022,6 @@ handle_commit(StringInfo s)
/* XXX: Don't care in production yet */
Assert(sap.last_updated_ts >= sap.remote_commit_ts);

/* WAL after commit, then to shmem */
spock_apply_progress_add_to_wal(&sap);

Assert(MyApplyWorker && MyApplyWorker->apply_group);

spock_group_progress_update_ptr(MyApplyWorker->apply_group, &sap);
Expand Down Expand Up @@ -2881,6 +2881,56 @@ send_feedback(PGconn *conn, XLogRecPtr recvpos, int64 now, bool force)
return true;
}

/*
* Send a one-shot standby status update with replyRequested=1 so the
* publisher responds immediately with a keepalive carrying its current
* sentPtr. Called once per apply-worker (re)connect to refresh
* remote_insert_lsn / received_lsn in shmem without waiting for
* wal_sender_timeout/2. Independent of send_feedback's static state.
*
* Note on ordering: this 'r' message does not jump ahead of any pending
* 'w' messages the walsender already has queued. That is fine -- in
* protocol v5+ every 'w' header carries the publisher's current insert
* position, so UpdateWorkerStats refreshes remote_insert_lsn on the
* first 'w' that arrives, often sooner than the eventual 'k'. The
* forced keepalive matters most when the publisher is otherwise idle;
* when there is data flowing, the data messages do the job.
*
* Protocol-version note: the 'r' / 'k' framing is the streaming-replication
* wire protocol (libpqwalreceiver level), not the spock output-plugin
* protocol carried inside 'w' message bodies, so this works on every
* negotiable spock protocol version. The 'k' response advances
* received_lsn on both v4 and v5+. remote_insert_lsn refresh paths differ:
* v5+ takes it from each 'w' header; v4 takes it from COMMIT message
* payloads. Until the first commit/keepalive lands, v4's remote_insert_lsn
* stays at whatever reconcile pinned it to (origin_lsn) -- correct, but
* not "live" -- while v5+ catches up on the very next 'w'.
*/
static void
request_initial_status_update(PGconn *conn, XLogRecPtr startpos)
{
StringInfoData msg;
int64 now = GetCurrentTimestamp();

initStringInfo(&msg);
pq_sendbyte(&msg, 'r');
pq_sendint64(&msg, startpos); /* write */
pq_sendint64(&msg, startpos); /* flush */
pq_sendint64(&msg, startpos); /* apply */
pq_sendint64(&msg, now); /* sendTime */
pq_sendbyte(&msg, true); /* replyRequested */

elog(DEBUG2, "SPOCK %s: requesting initial status update at %X/%X",
MySubscription->name,
LSN_FORMAT_ARGS(startpos));

if (PQputCopyData(conn, msg.data, msg.len) <= 0 || PQflush(conn))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("SPOCK %s: could not send initial status update: %s",
MySubscription->name, PQerrorMessage(conn))));
}

/*
* Update frequently changing statistics of the apply group
*/
Expand Down Expand Up @@ -3978,13 +4028,13 @@ interval_to_timeoffset(const Interval *interval)
void
spock_apply_main(Datum main_arg)
{
int slot = DatumGetInt32(main_arg);
PGconn *streamConn;
RepOriginId originid;
XLogRecPtr origin_startpos;
MemoryContext saved_ctx;
char *repsets;
char *origins;
int slot = DatumGetInt32(main_arg);
PGconn *streamConn;
RepOriginId originid;
XLogRecPtr origin_startpos;
MemoryContext saved_ctx;
char *repsets;
char *origins;

/* Setup shmem. */
spock_worker_attach(slot, SPOCK_WORKER_APPLY);
Expand Down Expand Up @@ -4052,6 +4102,8 @@ spock_apply_main(Datum main_arg)
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);

spock_init_progress_state(origin_startpos);

/* Start the replication. */
streamConn = spock_connect_replica(MySubscription->origin_if->dsn,
MySubscription->slot_name, NULL);
Expand All @@ -4070,6 +4122,13 @@ spock_apply_main(Datum main_arg)
MySubscription->force_text_transfer);
pfree(repsets);

/*
* Ask the publisher to immediately send a keepalive carrying its current
* sentPtr so we can refresh remote_insert_lsn/received_lsn in shmem
* without waiting for wal_sender_timeout/2.
*/
request_initial_status_update(streamConn, origin_startpos);

CommitTransactionCommand();

/*
Expand Down
Loading
Loading