From d4be228cf99718b4b264493f80797f56d0e220e2 Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Wed, 25 Apr 2018 15:45:46 -0600 Subject: [PATCH] Add more options for xmins and nextxids This adds two more options for additional metadata into the slot: - `include-xmins` This data includes the `catalog_xmin` and `xmin` fields from the replication slot. The reasons for wanting this generally occurs when using logical replication with a secondary. Since replication slots don't get replicated to secondaries, in the event of the failover you need some mechanism for seeing what records you may have missed. The xmin/catalog_xmin is useful as you can use it for a lower bound and any records with a higher xmin could have been missed - `include-next-xids` This includes sending both the `epoch` and `nextxid` 32-bit ints. The epoch is a number that increments when xid rollover happens. This allows you to reconstruct the same txid you get from the `txid_current` function which accounts for rollover. This can be really useful to see at a point's time how far behind the slot you currently are. --- Makefile | 2 +- README.md | 2 ++ expected/nextxid.out | 31 ++++++++++++++++++ expected/xmin.out | 42 ++++++++++++++++++++++++ sql/nextxid.sql | 17 ++++++++++ sql/xmin.sql | 19 +++++++++++ wal2json.c | 77 ++++++++++++++++++++++++++++++++++++++++---- 7 files changed, 182 insertions(+), 8 deletions(-) create mode 100644 expected/nextxid.out create mode 100644 expected/xmin.out create mode 100644 sql/nextxid.sql create mode 100644 sql/xmin.sql diff --git a/Makefile b/Makefile index 8b1e111..1270d24 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ MODULES = wal2json # message test will fail for <= 9.5 REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ delete3 delete4 savepoint specialvalue toast bytea message typmod \ - filtertable selecttable + filtertable selecttable xmin nextxid PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index a045e3b..ebf1764 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,8 @@ Parameters * `write-in-chunks`: write after every change instead of every changeset. Default is _false_. * `include-lsn`: add _nextlsn_ to each changeset. Default is _false_. * `include-unchanged-toast`: add TOAST value even if it was not modified. Since TOAST values are usually large, this option could save IO and bandwidth if it is disabled. Default is _true_. +* `include-xmins`: Add slot _catxmin_ (catalog xmin) and _xmin_ to each changeset. Default is _false_. +* `include-next-xids`: Adds _nextxid_ and _epoch_ (indicates xid wraparound) to each changeset. Default is _false_. * `filter-tables`: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. `*.foo` means table foo in all schemas and `bar.*` means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table `"public"."Foo bar"` should be specified as `public.Foo\ bar`. * `add-tables`: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from `filter-tables`. diff --git a/expected/nextxid.out b/expected/nextxid.out new file mode 100644 index 0000000..90f205c --- /dev/null +++ b/expected/nextxid.out @@ -0,0 +1,31 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +DROP TABLE IF EXISTS nextxid ; +NOTICE: table "nextxid" does not exist, skipping +CREATE TABLE nextxid (id integer PRIMARY KEY); +INSERT INTO nextxid values (1); +-- convert 32 bit epoch and 32 bit xid into 64 bit from txid_current +SELECT ((max(((data::json) -> 'epoch')::text::int)::bit(32) << 32) | max(((data::json) -> 'nextxid')::text::int)::bit(32))::bigint = txid_current() FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-next-xids', '1'); + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'nextxid') IS NOT NULL; + data +------ +(0 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/expected/xmin.out b/expected/xmin.out new file mode 100644 index 0000000..b09296d --- /dev/null +++ b/expected/xmin.out @@ -0,0 +1,42 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +DROP TABLE IF EXISTS xmin ; +NOTICE: table "xmin" does not exist, skipping +CREATE TABLE xmin (id integer PRIMARY KEY); +INSERT INTO xmin values (1); +-- xmin is often (always?) 0, but this should be forward compat should that change in the future +SELECT max(((data::json) -> 'xmin')::text::int) = (SELECT coalesce(xmin::text::int, 0) FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1'); + ?column? +---------- + t +(1 row) + +SELECT max(((data::json) -> 'catxmin')::text::int) = (SELECT catalog_xmin::text::int FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1'); + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'catxmin') IS NOT NULL; + data +------ +(0 rows) + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'xmin') IS NOT NULL; + data +------ +(0 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/sql/nextxid.sql b/sql/nextxid.sql new file mode 100644 index 0000000..a016b61 --- /dev/null +++ b/sql/nextxid.sql @@ -0,0 +1,17 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +DROP TABLE IF EXISTS nextxid ; + +CREATE TABLE nextxid (id integer PRIMARY KEY); +INSERT INTO nextxid values (1); + +-- convert 32 bit epoch and 32 bit xid into 64 bit from txid_current +SELECT ((max(((data::json) -> 'epoch')::text::int)::bit(32) << 32) | max(((data::json) -> 'nextxid')::text::int)::bit(32))::bigint = txid_current() FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-next-xids', '1'); + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'nextxid') IS NOT NULL; +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/sql/xmin.sql b/sql/xmin.sql new file mode 100644 index 0000000..224befa --- /dev/null +++ b/sql/xmin.sql @@ -0,0 +1,19 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +DROP TABLE IF EXISTS xmin ; + +CREATE TABLE xmin (id integer PRIMARY KEY); +INSERT INTO xmin values (1); + +-- xmin is often (always?) 0, but this should be forward compat should that change in the future +SELECT max(((data::json) -> 'xmin')::text::int) = (SELECT coalesce(xmin::text::int, 0) FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1'); +SELECT max(((data::json) -> 'catxmin')::text::int) = (SELECT catalog_xmin::text::int FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1'); + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'catxmin') IS NOT NULL; +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'xmin') IS NOT NULL; +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/wal2json.c b/wal2json.c index e3a082e..b56d2c6 100644 --- a/wal2json.c +++ b/wal2json.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * wal2json.c - * JSON output plugin for changeset extraction + * JSON output plugin for changeset extraction * * Copyright (c) 2013-2018, Euler Taveira de Oliveira * @@ -12,6 +12,8 @@ */ #include "postgres.h" +#include "access/xlog.h" + #include "catalog/pg_type.h" #include "replication/logical.h" @@ -40,6 +42,8 @@ typedef struct bool include_typmod; /* include typmod in types */ bool include_not_null; /* include not-null constraints */ bool include_unchanged_toast; /* include unchanged TOAST field values in output */ + bool include_xmins; /* include catalog_xmin and xmin field in output */ + bool include_next_xids; /* include postgres epoch field in output */ bool pretty_print; /* pretty-print JSON? */ bool write_in_chunks; /* write in chunks? */ @@ -124,7 +128,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE #endif - ); + ); data->include_xids = false; data->include_timestamp = false; data->include_schemas = true; @@ -136,6 +140,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is data->include_lsn = false; data->include_not_null = false; data->include_unchanged_toast = true; + data->include_xmins = false; + data->include_next_xids = false; data->filter_tables = NIL; /* add all tables in all schemas by default */ @@ -300,6 +306,32 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-xmins") == 0) + { + if (elem->arg == NULL) + { + elog(LOG, "include-xmins is null"); + data->include_xmins = true; + } + else if (!parse_bool(strVal(elem->arg), &data->include_xmins)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + else if (strcmp(elem->defname, "include-next-xids") == 0) + { + if (elem->arg == NULL) + { + elog(LOG, "include-next-xids is null"); + data->include_next_xids = true; + } + else if (!parse_bool(strVal(elem->arg), &data->include_next_xids)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else if (strcmp(elem->defname, "filter-tables") == 0) { char *rawstr; @@ -416,6 +448,37 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) appendStringInfo(ctx->out, "\"timestamp\":\"%s\",", timestamptz_to_str(txn->commit_time)); } + if (data->include_xmins) + { + if (data->pretty_print) + { + appendStringInfo(ctx->out, "\t\"xmin\": %u,\n", ctx->slot->effective_xmin); + appendStringInfo(ctx->out, "\t\"catxmin\": %u,\n", ctx->slot->effective_catalog_xmin); + } + else + { + appendStringInfo(ctx->out, "\"xmin\":%u,", ctx->slot->effective_xmin); + appendStringInfo(ctx->out, "\"catxmin\":%u,", ctx->slot->effective_catalog_xmin); + } + } + + if (data->include_next_xids) + { + TransactionId nextXid; + uint32 epoch; + GetNextXidAndEpoch(&nextXid, &epoch); + if (data->pretty_print) + { + appendStringInfo(ctx->out, "\t\"nextxid\": %u,\n", nextXid); + appendStringInfo(ctx->out, "\t\"epoch\": %u,\n", epoch); + } + else + { + appendStringInfo(ctx->out, "\"nextxid\":%u,", nextXid); + appendStringInfo(ctx->out, "\"epoch\":%u,", epoch); + } + } + if (data->pretty_print) appendStringInfoString(ctx->out, "\t\"change\": ["); else @@ -1320,9 +1383,9 @@ parse_table_identifier(List *qualified_tables, char separator, List **select_tab static bool string_to_SelectTable(char *rawstring, char separator, List **select_tables) { - char *nextp; + char *nextp; bool done = false; - List *qualified_tables = NIL; + List *qualified_tables = NIL; nextp = rawstring; @@ -1335,9 +1398,9 @@ string_to_SelectTable(char *rawstring, char separator, List **select_tables) /* At the top of the loop, we are at start of a new identifier. */ do { - char *curname; - char *endp; - char *qname; + char *curname; + char *endp; + char *qname; curname = nextp; while (*nextp && *nextp != separator && !isspace(*nextp))