Skip to content

Commit 912a504

Browse files
committed
feat(replication): co-publish additional tables and reconcile existing publications
LogicalReplicationClient gains an optional additionalTables option. These are published alongside the primary table in the same publication, and their WAL events stream through the same data handler. When the publication already exists, missing tables are added via ALTER PUBLICATION ADD TABLE (online, slot-preserving) instead of erroring, so a publication can gain a table without a drop and recreate.
1 parent 47610ee commit 912a504

1 file changed

Lines changed: 58 additions & 20 deletions

File tree

internal-packages/replication/src/client.ts

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ export interface LogicalReplicationClientOptions {
2323
* The table to replicate (for publication creation).
2424
*/
2525
table: string;
26+
/**
27+
* Additional tables to co-publish into the same publication. Their WAL
28+
* events stream through the same `data` handler as `table`, so use this only
29+
* when the extra tables share `table`'s row shape and downstream transform
30+
* (e.g. a parallel clone table). On startup they are added to an existing
31+
* publication via ALTER PUBLICATION ... ADD TABLE.
32+
*/
33+
additionalTables?: string[];
2634
/**
2735
* The name of the replication slot to use.
2836
*/
@@ -407,6 +415,15 @@ export class LogicalReplicationClient {
407415
return this;
408416
}
409417

418+
// The full set of tables this client publishes: the primary `table` plus any
419+
// `additionalTables`. Order is stable so the publication's FOR TABLE clause is
420+
// deterministic.
421+
#allTables(): string[] {
422+
return this.options.additionalTables
423+
? [this.options.table, ...this.options.additionalTables]
424+
: [this.options.table];
425+
}
426+
410427
async #createPublication(): Promise<boolean> {
411428
if (!this.client) {
412429
this.events.emit("error", new LogicalReplicationClientError("Client not connected"));
@@ -416,8 +433,10 @@ export class LogicalReplicationClient {
416433
const publicationExists = await this.#doesPublicationExist();
417434

418435
if (publicationExists) {
419-
// Validate the existing publication is correctly configured
420-
const validationError = await this.#validatePublicationConfiguration();
436+
// Reconcile the existing publication: add any configured table it is
437+
// missing (e.g. a clone table added after the publication was first
438+
// created). Returns an error string only for unrecoverable mismatches.
439+
const validationError = await this.#ensurePublicationConfiguration();
421440

422441
if (validationError) {
423442
this.logger.error("Publication exists but is misconfigured", {
@@ -441,9 +460,13 @@ export class LogicalReplicationClient {
441460
return true;
442461
}
443462

463+
const tableList = this.#allTables()
464+
.map((table) => `"${table}"`)
465+
.join(", ");
466+
444467
const [createError] = await tryCatch(
445468
this.client.query(
446-
`CREATE PUBLICATION "${this.options.publicationName}" FOR TABLE "${this.options.table}" ${
469+
`CREATE PUBLICATION "${this.options.publicationName}" FOR TABLE ${tableList} ${
447470
this.options.publicationActions
448471
? `WITH (publish = '${this.options.publicationActions.join(", ")}')`
449472
: ""
@@ -483,32 +506,47 @@ export class LogicalReplicationClient {
483506
return res.rows[0].exists;
484507
}
485508

486-
async #validatePublicationConfiguration(): Promise<string | null> {
509+
async #ensurePublicationConfiguration(): Promise<string | null> {
487510
if (!this.client) {
488-
return "Cannot validate publication configuration: client not connected";
511+
return "Cannot ensure publication configuration: client not connected";
489512
}
490513

491-
// Check if the publication has the correct table
514+
// Which public tables the publication already carries.
492515
const tablesRes = await this.client.query(
493-
`SELECT schemaname, tablename
494-
FROM pg_publication_tables
516+
`SELECT schemaname, tablename
517+
FROM pg_publication_tables
495518
WHERE pubname = '${this.options.publicationName}';`
496519
);
497520

498-
const tables = tablesRes.rows;
499-
const expectedTable = this.options.table;
500-
501-
// Check if the table is in the publication
502-
const hasTable = tables.some(
503-
(row) => row.tablename === expectedTable && row.schemaname === "public"
521+
const currentTables = new Set(
522+
tablesRes.rows
523+
.filter((row) => row.schemaname === "public")
524+
.map((row) => row.tablename as string)
504525
);
505526

506-
if (!hasTable) {
507-
if (tables.length === 0) {
508-
return `Publication '${this.options.publicationName}' exists but has NO TABLES configured. Expected table: "public.${expectedTable}". Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`;
509-
} else {
510-
const tableList = tables.map((t) => `"${t.schemaname}"."${t.tablename}"`).join(", ");
511-
return `Publication '${this.options.publicationName}' exists but does not include the required table "public.${expectedTable}". Current tables: ${tableList}. Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`;
527+
// Reconcile rather than reject: add any configured table the publication is
528+
// missing. ALTER PUBLICATION ... ADD TABLE is online and leaves the slot
529+
// position intact, so an existing publication can gain a table (e.g.
530+
// task_run_v2 alongside TaskRun) without a drop/recreate. ADD TABLE on a
531+
// table already published raises duplicate_object (42710); treat that as a
532+
// benign race (another instance won) rather than a failure.
533+
const missingTables = this.#allTables().filter((table) => !currentTables.has(table));
534+
535+
for (const table of missingTables) {
536+
this.logger.info("Adding table to existing publication", {
537+
name: this.options.name,
538+
publicationName: this.options.publicationName,
539+
table,
540+
});
541+
542+
const [addError] = await tryCatch(
543+
this.client.query(
544+
`ALTER PUBLICATION "${this.options.publicationName}" ADD TABLE "${table}";`
545+
)
546+
);
547+
548+
if (addError && (addError as { code?: string }).code !== "42710") {
549+
return `Failed to add table "public.${table}" to publication '${this.options.publicationName}': ${addError.message}`;
512550
}
513551
}
514552

0 commit comments

Comments
 (0)